This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 28d35c8  [SPARK-27162][SQL] Add new method asCaseSensitiveMap in 
CaseInsensitiveStringMap
28d35c8 is described below

commit 28d35c85789edb060dda47a984b5b3f0a27d8bf0
Author: Gengliang Wang <[email protected]>
AuthorDate: Tue Mar 19 13:35:47 2019 +0800

    [SPARK-27162][SQL] Add new method asCaseSensitiveMap in 
CaseInsensitiveStringMap
    
    ## What changes were proposed in this pull request?
    
    Currently, DataFrameReader/DataFrameReader supports setting Hadoop 
configurations via method `.option()`.
    E.g, the following test case should be passed in both ORC V1 and V2
    ```
      class TestFileFilter extends PathFilter {
        override def accept(path: Path): Boolean = path.getParent.getName != 
"p=2"
      }
    
      withTempPath { dir =>
          val path = dir.getCanonicalPath
    
          val df = spark.range(2)
          df.write.orc(path + "/p=1")
          df.write.orc(path + "/p=2")
          val extraOptions = Map(
            "mapred.input.pathFilter.class" -> classOf[TestFileFilter].getName,
            "mapreduce.input.pathFilter.class" -> 
classOf[TestFileFilter].getName
          )
          assert(spark.read.options(extraOptions).orc(path).count() === 2)
        }
      }
    ```
    While Hadoop Configurations are case sensitive, the current data source V2 
APIs are using `CaseInsensitiveStringMap` in the top level entry 
`TableProvider`.
    To create Hadoop configurations correctly, I suggest
    1. adding a new method `asCaseSensitiveMap` in `CaseInsensitiveStringMap`.
    2. Make `CaseInsensitiveStringMap` read-only to ambiguous conversion in 
`asCaseSensitiveMap`
    
    ## How was this patch tested?
    
    Unit test
    
    Closes #24094 from gengliangwang/originalMap.
    
    Authored-by: Gengliang Wang <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../org/apache/spark/sql/catalog/v2/Catalogs.java  |  5 +--
 .../spark/sql/util/CaseInsensitiveStringMap.java   | 37 +++++++++++++++++-----
 .../sql/util/CaseInsensitiveStringMapSuite.scala   | 31 ++++++++++++++++--
 .../sql/execution/datasources/v2/FileTable.scala   |  7 ++--
 .../datasources/v2/FileWriteBuilder.scala          |  8 ++---
 .../datasources/v2/orc/OrcScanBuilder.scala        |  6 +++-
 .../orc/OrcPartitionDiscoverySuite.scala           | 23 ++++++++++++++
 7 files changed, 96 insertions(+), 21 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java
index efae266..aa4cbfc 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java
@@ -23,6 +23,7 @@ import org.apache.spark.sql.internal.SQLConf;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 import org.apache.spark.util.Utils;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -96,7 +97,7 @@ public class Catalogs {
     Map<String, String> allConfs = 
mapAsJavaMapConverter(conf.getAllConfs()).asJava();
     Pattern prefix = Pattern.compile("^spark\\.sql\\.catalog\\." + name + 
"\\.(.+)");
 
-    CaseInsensitiveStringMap options = CaseInsensitiveStringMap.empty();
+    HashMap<String, String> options = new HashMap<>();
     for (Map.Entry<String, String> entry : allConfs.entrySet()) {
       Matcher matcher = prefix.matcher(entry.getKey());
       if (matcher.matches() && matcher.groupCount() > 0) {
@@ -104,6 +105,6 @@ public class Catalogs {
       }
     }
 
-    return options;
+    return new CaseInsensitiveStringMap(options);
   }
 }
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java
index 704d90e..da41346 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java
@@ -18,8 +18,11 @@
 package org.apache.spark.sql.util;
 
 import org.apache.spark.annotation.Experimental;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Locale;
 import java.util.Map;
@@ -35,16 +38,29 @@ import java.util.Set;
  */
 @Experimental
 public class CaseInsensitiveStringMap implements Map<String, String> {
+  private final Logger logger = 
LoggerFactory.getLogger(CaseInsensitiveStringMap.class);
+
+  private String unsupportedOperationMsg = "CaseInsensitiveStringMap is 
read-only.";
 
   public static CaseInsensitiveStringMap empty() {
     return new CaseInsensitiveStringMap(new HashMap<>(0));
   }
 
+  private final Map<String, String> original;
+
   private final Map<String, String> delegate;
 
   public CaseInsensitiveStringMap(Map<String, String> originalMap) {
-    this.delegate = new HashMap<>(originalMap.size());
-    putAll(originalMap);
+    original = new HashMap<>(originalMap);
+    delegate = new HashMap<>(originalMap.size());
+    for (Map.Entry<String, String> entry : originalMap.entrySet()) {
+      String key = toLowerCase(entry.getKey());
+      if (delegate.containsKey(key)) {
+        logger.warn("Converting duplicated key " + entry.getKey() +
+                " into CaseInsensitiveStringMap.");
+      }
+      delegate.put(key, entry.getValue());
+    }
   }
 
   @Override
@@ -78,24 +94,22 @@ public class CaseInsensitiveStringMap implements 
Map<String, String> {
 
   @Override
   public String put(String key, String value) {
-    return delegate.put(toLowerCase(key), value);
+    throw new UnsupportedOperationException(unsupportedOperationMsg);
   }
 
   @Override
   public String remove(Object key) {
-    return delegate.remove(toLowerCase(key));
+    throw new UnsupportedOperationException(unsupportedOperationMsg);
   }
 
   @Override
   public void putAll(Map<? extends String, ? extends String> m) {
-    for (Map.Entry<? extends String, ? extends String> entry : m.entrySet()) {
-      put(entry.getKey(), entry.getValue());
-    }
+    throw new UnsupportedOperationException(unsupportedOperationMsg);
   }
 
   @Override
   public void clear() {
-    delegate.clear();
+    throw new UnsupportedOperationException(unsupportedOperationMsg);
   }
 
   @Override
@@ -157,4 +171,11 @@ public class CaseInsensitiveStringMap implements 
Map<String, String> {
     String value = get(key);
     return value == null ? defaultValue : Double.parseDouble(value);
   }
+
+  /**
+   * Returns the original case-sensitive map.
+   */
+  public Map<String, String> asCaseSensitiveMap() {
+    return Collections.unmodifiableMap(original);
+  }
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala
index 623ddeb..0accb47 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.util
 
+import java.util
+
 import scala.collection.JavaConverters._
 
 import org.apache.spark.SparkFunSuite
@@ -25,9 +27,16 @@ class CaseInsensitiveStringMapSuite extends SparkFunSuite {
 
   test("put and get") {
     val options = CaseInsensitiveStringMap.empty()
-    options.put("kEy", "valUE")
-    assert(options.get("key") == "valUE")
-    assert(options.get("KEY") == "valUE")
+    intercept[UnsupportedOperationException] {
+      options.put("kEy", "valUE")
+    }
+  }
+
+  test("clear") {
+    val options = new CaseInsensitiveStringMap(Map("kEy" -> "valUE").asJava)
+    intercept[UnsupportedOperationException] {
+      options.clear()
+    }
   }
 
   test("key and value set") {
@@ -80,4 +89,20 @@ class CaseInsensitiveStringMapSuite extends SparkFunSuite {
       options.getDouble("foo", 0.1d)
     }
   }
+
+  test("asCaseSensitiveMap") {
+    val originalMap = new util.HashMap[String, String] {
+      put("Foo", "Bar")
+      put("OFO", "ABR")
+      put("OoF", "bar")
+    }
+
+    val options = new CaseInsensitiveStringMap(originalMap)
+    val caseSensitiveMap = options.asCaseSensitiveMap
+    assert(caseSensitiveMap.equals(originalMap))
+    // The result of `asCaseSensitiveMap` is read-only.
+    intercept[UnsupportedOperationException] {
+      caseSensitiveMap.put("kEy", "valUE")
+    }
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
index 5944a20..4b35df3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
@@ -36,13 +36,14 @@ abstract class FileTable(
   extends Table with SupportsRead with SupportsWrite {
 
   lazy val fileIndex: PartitioningAwareFileIndex = {
-    val scalaMap = options.asScala.toMap
-    val hadoopConf = 
sparkSession.sessionState.newHadoopConfWithOptions(scalaMap)
+    val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
+    // Hadoop Configurations are case sensitive.
+    val hadoopConf = 
sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
     val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(paths, 
hadoopConf,
       checkEmptyGlobPath = true, checkFilesExist = true)
     val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
     new InMemoryFileIndex(
-      sparkSession, rootPathsSpecified, scalaMap, userSpecifiedSchema, 
fileStatusCache)
+      sparkSession, rootPathsSpecified, caseSensitiveMap, userSpecifiedSchema, 
fileStatusCache)
   }
 
   lazy val dataSchema: StructType = userSpecifiedSchema.orElse {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala
index 0d07f5a..bb4a428 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala
@@ -64,16 +64,16 @@ abstract class FileWriteBuilder(options: 
CaseInsensitiveStringMap, paths: Seq[St
     val sparkSession = SparkSession.active
     validateInputs(sparkSession.sessionState.conf.caseSensitiveAnalysis)
     val path = new Path(paths.head)
-    val optionsAsScala = options.asScala.toMap
-
-    val hadoopConf = 
sparkSession.sessionState.newHadoopConfWithOptions(optionsAsScala)
+    val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
+    // Hadoop Configurations are case sensitive.
+    val hadoopConf = 
sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
     val job = getJobInstance(hadoopConf, path)
     val committer = FileCommitProtocol.instantiate(
       sparkSession.sessionState.conf.fileCommitProtocolClass,
       jobId = java.util.UUID.randomUUID().toString,
       outputPath = paths.head)
     lazy val description =
-      createWriteJobDescription(sparkSession, hadoopConf, job, paths.head, 
optionsAsScala)
+      createWriteJobDescription(sparkSession, hadoopConf, job, paths.head, 
options.asScala.toMap)
 
     val fs = path.getFileSystem(hadoopConf)
     mode match {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala
index 4767f21..8ac56aa 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala
@@ -37,7 +37,11 @@ case class OrcScanBuilder(
     dataSchema: StructType,
     options: CaseInsensitiveStringMap)
   extends FileScanBuilder(schema) with SupportsPushDownFilters {
-  lazy val hadoopConf = 
sparkSession.sessionState.newHadoopConfWithOptions(options.asScala.toMap)
+  lazy val hadoopConf = {
+    val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
+    // Hadoop Configurations are case sensitive.
+    sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
+  }
 
   override def build(): Scan = {
     OrcScan(sparkSession, hadoopConf, fileIndex, dataSchema, readSchema, 
options)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala
index bc5a30e..e1d0254 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.orc
 
 import java.io.File
 
+import org.apache.hadoop.fs.{Path, PathFilter}
+
 import org.apache.spark.SparkConf
 import org.apache.spark.sql._
 import org.apache.spark.sql.internal.SQLConf
@@ -30,6 +32,10 @@ case class OrcParData(intField: Int, stringField: String)
 // The data that also includes the partitioning key
 case class OrcParDataWithKey(intField: Int, pi: Int, stringField: String, ps: 
String)
 
+class TestFileFilter extends PathFilter {
+  override def accept(path: Path): Boolean = path.getParent.getName != "p=2"
+}
+
 abstract class OrcPartitionDiscoveryTest extends OrcTest {
   val defaultPartitionName = "__HIVE_DEFAULT_PARTITION__"
 
@@ -226,6 +232,23 @@ abstract class OrcPartitionDiscoveryTest extends OrcTest {
       }
     }
   }
+
+  test("SPARK-27162: handle pathfilter configuration correctly") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+
+      val df = spark.range(2)
+      df.write.orc(path + "/p=1")
+      df.write.orc(path + "/p=2")
+      assert(spark.read.orc(path).count() === 4)
+
+      val extraOptions = Map(
+        "mapred.input.pathFilter.class" -> classOf[TestFileFilter].getName,
+        "mapreduce.input.pathFilter.class" -> classOf[TestFileFilter].getName
+      )
+      assert(spark.read.options(extraOptions).orc(path).count() === 2)
+    }
+  }
 }
 
 class OrcPartitionDiscoverySuite extends OrcPartitionDiscoveryTest with 
SharedSQLContext


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to