Repository: spark
Updated Branches:
  refs/heads/master c1f344f1a -> 140570252


[SPARK-18044][STREAMING] FileStreamSource should not infer partitions in every 
batch

## What changes were proposed in this pull request?

In `FileStreamSource.getBatch`, we will create a `DataSource` with specified 
schema, to avoid inferring the schema again and again. However, we don't pass 
the partition columns, and will infer the partition again and again.

This PR fixes it by keeping the partition columns in `FileStreamSource`, like 
schema.

## How was this patch tested?

N/A

Author: Wenchen Fan <[email protected]>

Closes #15581 from cloud-fan/stream.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14057025
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14057025
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14057025

Branch: refs/heads/master
Commit: 140570252fd3739d6bdcadd6d4d5a180e480d3e0
Parents: c1f344f
Author: Wenchen Fan <[email protected]>
Authored: Fri Oct 21 15:28:16 2016 -0700
Committer: Shixiong Zhu <[email protected]>
Committed: Fri Oct 21 15:28:16 2016 -0700

----------------------------------------------------------------------
 .../sql/execution/datasources/DataSource.scala  | 26 ++++++++++++++------
 .../execution/streaming/FileStreamSource.scala  |  2 ++
 .../streaming/FileStreamSourceSuite.scala       |  2 +-
 3 files changed, 21 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/14057025/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 92b1fff..17da606 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -75,7 +75,7 @@ case class DataSource(
     bucketSpec: Option[BucketSpec] = None,
     options: Map[String, String] = Map.empty) extends Logging {
 
-  case class SourceInfo(name: String, schema: StructType)
+  case class SourceInfo(name: String, schema: StructType, partitionColumns: 
Seq[String])
 
   lazy val providingClass: Class[_] = lookupDataSource(className)
   lazy val sourceInfo = sourceSchema()
@@ -186,8 +186,11 @@ case class DataSource(
     }
   }
 
-  private def inferFileFormatSchema(format: FileFormat): StructType = {
-    userSpecifiedSchema.orElse {
+  /**
+   * Infer the schema of the given FileFormat, returns a pair of schema and 
partition column names.
+   */
+  private def inferFileFormatSchema(format: FileFormat): (StructType, 
Seq[String]) = {
+    userSpecifiedSchema.map(_ -> partitionColumns).orElse {
       val caseInsensitiveOptions = new CaseInsensitiveMap(options)
       val allPaths = caseInsensitiveOptions.get("path")
       val globbedPaths = allPaths.toSeq.flatMap { path =>
@@ -197,14 +200,14 @@ case class DataSource(
         SparkHadoopUtil.get.globPathIfNecessary(qualified)
       }.toArray
       val fileCatalog = new ListingFileCatalog(sparkSession, globbedPaths, 
options, None)
-      val partitionCols = fileCatalog.partitionSpec().partitionColumns.fields
+      val partitionSchema = fileCatalog.partitionSpec().partitionColumns
       val inferred = format.inferSchema(
         sparkSession,
         caseInsensitiveOptions,
         fileCatalog.allFiles())
 
       inferred.map { inferredSchema =>
-        StructType(inferredSchema ++ partitionCols)
+        StructType(inferredSchema ++ partitionSchema) -> 
partitionSchema.map(_.name)
       }
     }.getOrElse {
       throw new AnalysisException("Unable to infer schema. It must be 
specified manually.")
@@ -217,7 +220,7 @@ case class DataSource(
       case s: StreamSourceProvider =>
         val (name, schema) = s.sourceSchema(
           sparkSession.sqlContext, userSpecifiedSchema, className, options)
-        SourceInfo(name, schema)
+        SourceInfo(name, schema, Nil)
 
       case format: FileFormat =>
         val caseInsensitiveOptions = new CaseInsensitiveMap(options)
@@ -246,7 +249,8 @@ case class DataSource(
               "you may be able to create a static DataFrame on that directory 
with " +
               "'spark.read.load(directory)' and infer schema from it.")
         }
-        SourceInfo(s"FileSource[$path]", inferFileFormatSchema(format))
+        val (schema, partCols) = inferFileFormatSchema(format)
+        SourceInfo(s"FileSource[$path]", schema, partCols)
 
       case _ =>
         throw new UnsupportedOperationException(
@@ -266,7 +270,13 @@ case class DataSource(
           throw new IllegalArgumentException("'path' is not specified")
         })
         new FileStreamSource(
-          sparkSession, path, className, sourceInfo.schema, metadataPath, 
options)
+          sparkSession = sparkSession,
+          path = path,
+          fileFormatClassName = className,
+          schema = sourceInfo.schema,
+          partitionColumns = sourceInfo.partitionColumns,
+          metadataPath = metadataPath,
+          options = options)
       case _ =>
         throw new UnsupportedOperationException(
           s"Data source $className does not support streamed reading")

http://git-wip-us.apache.org/repos/asf/spark/blob/14057025/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 614a626..115edf7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -35,6 +35,7 @@ class FileStreamSource(
     path: String,
     fileFormatClassName: String,
     override val schema: StructType,
+    partitionColumns: Seq[String],
     metadataPath: String,
     options: Map[String, String]) extends Source with Logging {
 
@@ -142,6 +143,7 @@ class FileStreamSource(
         sparkSession,
         paths = files.map(_.path),
         userSpecifiedSchema = Some(schema),
+        partitionColumns = partitionColumns,
         className = fileFormatClassName,
         options = optionsWithPartitionBasePath)
     Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation(

http://git-wip-us.apache.org/repos/asf/spark/blob/14057025/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
index 3e1e112..4a47c04 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
@@ -94,7 +94,7 @@ class FileStreamSourceSuite extends SparkFunSuite with 
SharedSQLContext {
         new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, 
dir.getAbsolutePath)
       assert(metadataLog.add(0, Array(FileEntry(s"$scheme:///file1", 100L, 
0))))
 
-      val newSource = new FileStreamSource(spark, s"$scheme:///", "parquet", 
StructType(Nil),
+      val newSource = new FileStreamSource(spark, s"$scheme:///", "parquet", 
StructType(Nil), Nil,
         dir.getAbsolutePath, Map.empty)
       // this method should throw an exception if `fs.exists` is called during 
resolveRelation
       newSource.getBatch(None, LongOffset(1))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to