This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 28d35c8 [SPARK-27162][SQL] Add new method asCaseSensitiveMap in
CaseInsensitiveStringMap
28d35c8 is described below
commit 28d35c85789edb060dda47a984b5b3f0a27d8bf0
Author: Gengliang Wang <[email protected]>
AuthorDate: Tue Mar 19 13:35:47 2019 +0800
[SPARK-27162][SQL] Add new method asCaseSensitiveMap in
CaseInsensitiveStringMap
## What changes were proposed in this pull request?
Currently, DataFrameReader/DataFrameReader supports setting Hadoop
configurations via method `.option()`.
E.g, the following test case should be passed in both ORC V1 and V2
```
class TestFileFilter extends PathFilter {
override def accept(path: Path): Boolean = path.getParent.getName !=
"p=2"
}
withTempPath { dir =>
val path = dir.getCanonicalPath
val df = spark.range(2)
df.write.orc(path + "/p=1")
df.write.orc(path + "/p=2")
val extraOptions = Map(
"mapred.input.pathFilter.class" -> classOf[TestFileFilter].getName,
"mapreduce.input.pathFilter.class" ->
classOf[TestFileFilter].getName
)
assert(spark.read.options(extraOptions).orc(path).count() === 2)
}
}
```
While Hadoop Configurations are case sensitive, the current data source V2
APIs are using `CaseInsensitiveStringMap` in the top level entry
`TableProvider`.
To create Hadoop configurations correctly, I suggest
1. adding a new method `asCaseSensitiveMap` in `CaseInsensitiveStringMap`.
2. Make `CaseInsensitiveStringMap` read-only to ambiguous conversion in
`asCaseSensitiveMap`
## How was this patch tested?
Unit test
Closes #24094 from gengliangwang/originalMap.
Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../org/apache/spark/sql/catalog/v2/Catalogs.java | 5 +--
.../spark/sql/util/CaseInsensitiveStringMap.java | 37 +++++++++++++++++-----
.../sql/util/CaseInsensitiveStringMapSuite.scala | 31 ++++++++++++++++--
.../sql/execution/datasources/v2/FileTable.scala | 7 ++--
.../datasources/v2/FileWriteBuilder.scala | 8 ++---
.../datasources/v2/orc/OrcScanBuilder.scala | 6 +++-
.../orc/OrcPartitionDiscoverySuite.scala | 23 ++++++++++++++
7 files changed, 96 insertions(+), 21 deletions(-)
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java
index efae266..aa4cbfc 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java
@@ -23,6 +23,7 @@ import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.apache.spark.util.Utils;
+import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -96,7 +97,7 @@ public class Catalogs {
Map<String, String> allConfs =
mapAsJavaMapConverter(conf.getAllConfs()).asJava();
Pattern prefix = Pattern.compile("^spark\\.sql\\.catalog\\." + name +
"\\.(.+)");
- CaseInsensitiveStringMap options = CaseInsensitiveStringMap.empty();
+ HashMap<String, String> options = new HashMap<>();
for (Map.Entry<String, String> entry : allConfs.entrySet()) {
Matcher matcher = prefix.matcher(entry.getKey());
if (matcher.matches() && matcher.groupCount() > 0) {
@@ -104,6 +105,6 @@ public class Catalogs {
}
}
- return options;
+ return new CaseInsensitiveStringMap(options);
}
}
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java
index 704d90e..da41346 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java
@@ -18,8 +18,11 @@
package org.apache.spark.sql.util;
import org.apache.spark.annotation.Experimental;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
@@ -35,16 +38,29 @@ import java.util.Set;
*/
@Experimental
public class CaseInsensitiveStringMap implements Map<String, String> {
+ private final Logger logger =
LoggerFactory.getLogger(CaseInsensitiveStringMap.class);
+
+ private String unsupportedOperationMsg = "CaseInsensitiveStringMap is
read-only.";
public static CaseInsensitiveStringMap empty() {
return new CaseInsensitiveStringMap(new HashMap<>(0));
}
+ private final Map<String, String> original;
+
private final Map<String, String> delegate;
public CaseInsensitiveStringMap(Map<String, String> originalMap) {
- this.delegate = new HashMap<>(originalMap.size());
- putAll(originalMap);
+ original = new HashMap<>(originalMap);
+ delegate = new HashMap<>(originalMap.size());
+ for (Map.Entry<String, String> entry : originalMap.entrySet()) {
+ String key = toLowerCase(entry.getKey());
+ if (delegate.containsKey(key)) {
+ logger.warn("Converting duplicated key " + entry.getKey() +
+ " into CaseInsensitiveStringMap.");
+ }
+ delegate.put(key, entry.getValue());
+ }
}
@Override
@@ -78,24 +94,22 @@ public class CaseInsensitiveStringMap implements
Map<String, String> {
@Override
public String put(String key, String value) {
- return delegate.put(toLowerCase(key), value);
+ throw new UnsupportedOperationException(unsupportedOperationMsg);
}
@Override
public String remove(Object key) {
- return delegate.remove(toLowerCase(key));
+ throw new UnsupportedOperationException(unsupportedOperationMsg);
}
@Override
public void putAll(Map<? extends String, ? extends String> m) {
- for (Map.Entry<? extends String, ? extends String> entry : m.entrySet()) {
- put(entry.getKey(), entry.getValue());
- }
+ throw new UnsupportedOperationException(unsupportedOperationMsg);
}
@Override
public void clear() {
- delegate.clear();
+ throw new UnsupportedOperationException(unsupportedOperationMsg);
}
@Override
@@ -157,4 +171,11 @@ public class CaseInsensitiveStringMap implements
Map<String, String> {
String value = get(key);
return value == null ? defaultValue : Double.parseDouble(value);
}
+
+ /**
+ * Returns the original case-sensitive map.
+ */
+ public Map<String, String> asCaseSensitiveMap() {
+ return Collections.unmodifiableMap(original);
+ }
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala
index 623ddeb..0accb47 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.util
+import java.util
+
import scala.collection.JavaConverters._
import org.apache.spark.SparkFunSuite
@@ -25,9 +27,16 @@ class CaseInsensitiveStringMapSuite extends SparkFunSuite {
test("put and get") {
val options = CaseInsensitiveStringMap.empty()
- options.put("kEy", "valUE")
- assert(options.get("key") == "valUE")
- assert(options.get("KEY") == "valUE")
+ intercept[UnsupportedOperationException] {
+ options.put("kEy", "valUE")
+ }
+ }
+
+ test("clear") {
+ val options = new CaseInsensitiveStringMap(Map("kEy" -> "valUE").asJava)
+ intercept[UnsupportedOperationException] {
+ options.clear()
+ }
}
test("key and value set") {
@@ -80,4 +89,20 @@ class CaseInsensitiveStringMapSuite extends SparkFunSuite {
options.getDouble("foo", 0.1d)
}
}
+
+ test("asCaseSensitiveMap") {
+ val originalMap = new util.HashMap[String, String] {
+ put("Foo", "Bar")
+ put("OFO", "ABR")
+ put("OoF", "bar")
+ }
+
+ val options = new CaseInsensitiveStringMap(originalMap)
+ val caseSensitiveMap = options.asCaseSensitiveMap
+ assert(caseSensitiveMap.equals(originalMap))
+ // The result of `asCaseSensitiveMap` is read-only.
+ intercept[UnsupportedOperationException] {
+ caseSensitiveMap.put("kEy", "valUE")
+ }
+ }
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
index 5944a20..4b35df3 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
@@ -36,13 +36,14 @@ abstract class FileTable(
extends Table with SupportsRead with SupportsWrite {
lazy val fileIndex: PartitioningAwareFileIndex = {
- val scalaMap = options.asScala.toMap
- val hadoopConf =
sparkSession.sessionState.newHadoopConfWithOptions(scalaMap)
+ val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
+ // Hadoop Configurations are case sensitive.
+ val hadoopConf =
sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(paths,
hadoopConf,
checkEmptyGlobPath = true, checkFilesExist = true)
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
new InMemoryFileIndex(
- sparkSession, rootPathsSpecified, scalaMap, userSpecifiedSchema,
fileStatusCache)
+ sparkSession, rootPathsSpecified, caseSensitiveMap, userSpecifiedSchema,
fileStatusCache)
}
lazy val dataSchema: StructType = userSpecifiedSchema.orElse {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala
index 0d07f5a..bb4a428 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala
@@ -64,16 +64,16 @@ abstract class FileWriteBuilder(options:
CaseInsensitiveStringMap, paths: Seq[St
val sparkSession = SparkSession.active
validateInputs(sparkSession.sessionState.conf.caseSensitiveAnalysis)
val path = new Path(paths.head)
- val optionsAsScala = options.asScala.toMap
-
- val hadoopConf =
sparkSession.sessionState.newHadoopConfWithOptions(optionsAsScala)
+ val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
+ // Hadoop Configurations are case sensitive.
+ val hadoopConf =
sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
val job = getJobInstance(hadoopConf, path)
val committer = FileCommitProtocol.instantiate(
sparkSession.sessionState.conf.fileCommitProtocolClass,
jobId = java.util.UUID.randomUUID().toString,
outputPath = paths.head)
lazy val description =
- createWriteJobDescription(sparkSession, hadoopConf, job, paths.head,
optionsAsScala)
+ createWriteJobDescription(sparkSession, hadoopConf, job, paths.head,
options.asScala.toMap)
val fs = path.getFileSystem(hadoopConf)
mode match {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala
index 4767f21..8ac56aa 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala
@@ -37,7 +37,11 @@ case class OrcScanBuilder(
dataSchema: StructType,
options: CaseInsensitiveStringMap)
extends FileScanBuilder(schema) with SupportsPushDownFilters {
- lazy val hadoopConf =
sparkSession.sessionState.newHadoopConfWithOptions(options.asScala.toMap)
+ lazy val hadoopConf = {
+ val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
+ // Hadoop Configurations are case sensitive.
+ sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
+ }
override def build(): Scan = {
OrcScan(sparkSession, hadoopConf, fileIndex, dataSchema, readSchema,
options)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala
index bc5a30e..e1d0254 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.orc
import java.io.File
+import org.apache.hadoop.fs.{Path, PathFilter}
+
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.internal.SQLConf
@@ -30,6 +32,10 @@ case class OrcParData(intField: Int, stringField: String)
// The data that also includes the partitioning key
case class OrcParDataWithKey(intField: Int, pi: Int, stringField: String, ps:
String)
+class TestFileFilter extends PathFilter {
+ override def accept(path: Path): Boolean = path.getParent.getName != "p=2"
+}
+
abstract class OrcPartitionDiscoveryTest extends OrcTest {
val defaultPartitionName = "__HIVE_DEFAULT_PARTITION__"
@@ -226,6 +232,23 @@ abstract class OrcPartitionDiscoveryTest extends OrcTest {
}
}
}
+
+ test("SPARK-27162: handle pathfilter configuration correctly") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+
+ val df = spark.range(2)
+ df.write.orc(path + "/p=1")
+ df.write.orc(path + "/p=2")
+ assert(spark.read.orc(path).count() === 4)
+
+ val extraOptions = Map(
+ "mapred.input.pathFilter.class" -> classOf[TestFileFilter].getName,
+ "mapreduce.input.pathFilter.class" -> classOf[TestFileFilter].getName
+ )
+ assert(spark.read.options(extraOptions).orc(path).count() === 2)
+ }
+ }
}
class OrcPartitionDiscoverySuite extends OrcPartitionDiscoveryTest with
SharedSQLContext
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]