Repository: spark
Updated Branches:
  refs/heads/branch-2.0 45bc65519 -> c5b7e1f70


[SPARK-14716][SQL] Added support for partitioning in FileStreamSink

# What changes were proposed in this pull request?

Support partitioning in the file stream sink. This is implemented using a new, 
but simpler code path for writing parquet files - both unpartitioned and 
partitioned. This new code path does not use Output Committers, as we will 
eventually write the file names to the metadata log for "committing" them.

This patch duplicates < 100 LOC from the WriterContainer. But its far simpler 
that WriterContainer as it does not involve output committing. In addition, it 
introduces the new APIs in FileFormat and OutputWriterFactory in an attempt to 
simplify the APIs (not have Job in the `FileFormat` API, not have bucket and 
other stuff in the `OutputWriterFactory.newInstance()` ).

# Tests
- New unit tests to test the FileStreamSinkWriter for partitioned and 
unpartitioned files
- New unit test to partially test the FileStreamSink for partitioned files 
(does not test recovery of partition column data, as that requires change in 
the StreamFileCatalog, future PR).
- Updated FileStressSuite to test number of records read from partitioned 
output files.

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #12409 from tdas/streaming-partitioned-parquet.

(cherry picked from commit 4ad492c40358d0104db508db98ce0971114b6817)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c5b7e1f7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c5b7e1f7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c5b7e1f7

Branch: refs/heads/branch-2.0
Commit: c5b7e1f70424b176b655936dbcace93e4d4a7210
Parents: 45bc655
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Tue May 3 10:58:26 2016 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Tue May 3 10:58:39 2016 -0700

----------------------------------------------------------------------
 python/pyspark/sql/tests.py                     |   4 +-
 .../sql/execution/datasources/DataSource.scala  |   5 +-
 .../datasources/fileSourceInterfaces.scala      |  28 +++
 .../datasources/parquet/ParquetRelation.scala   | 116 +++++++++-
 .../execution/streaming/FileStreamSink.scala    | 230 +++++++++++++++++--
 .../execution/streaming/FileStreamSinkLog.scala |  13 ++
 .../execution/streaming/HDFSMetadataLog.scala   |   5 +-
 .../sql/streaming/FileStreamSinkSuite.scala     | 218 ++++++++++++++++--
 .../spark/sql/streaming/FileStressSuite.scala   |  40 +++-
 9 files changed, 605 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c5b7e1f7/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 4995b26..cd5c4a7 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -940,7 +940,7 @@ class SQLTests(ReusedPySparkTestCase):
             cq.processAllAvailable()
             output_files = []
             for _, _, files in os.walk(out):
-                output_files.extend([f for f in files if 'parquet' in f and 
not f.startswith('.')])
+                output_files.extend([f for f in files if not 
f.startswith('.')])
             self.assertTrue(len(output_files) > 0)
             self.assertTrue(len(os.listdir(chk)) > 0)
         finally:
@@ -967,7 +967,7 @@ class SQLTests(ReusedPySparkTestCase):
             cq.processAllAvailable()
             output_files = []
             for _, _, files in os.walk(out):
-                output_files.extend([f for f in files if 'parquet' in f and 
not f.startswith('.')])
+                output_files.extend([f for f in files if not 
f.startswith('.')])
             self.assertTrue(len(output_files) > 0)
             self.assertTrue(len(os.listdir(chk)) > 0)
             self.assertFalse(os.path.isdir(fake1))  # should not have been 
created

http://git-wip-us.apache.org/repos/asf/spark/blob/c5b7e1f7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 63dc1fd..6114142 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -203,13 +203,14 @@ case class DataSource(
   def createSink(): Sink = {
     providingClass.newInstance() match {
       case s: StreamSinkProvider => s.createSink(sparkSession.wrapped, 
options, partitionColumns)
-      case format: FileFormat =>
+
+      case parquet: parquet.DefaultSource =>
         val caseInsensitiveOptions = new CaseInsensitiveMap(options)
         val path = caseInsensitiveOptions.getOrElse("path", {
           throw new IllegalArgumentException("'path' is not specified")
         })
+        new FileStreamSink(sparkSession, path, parquet, partitionColumns, 
options)
 
-        new FileStreamSink(sparkSession, path, format)
       case _ =>
         throw new UnsupportedOperationException(
           s"Data source $className does not support streamed writing")

http://git-wip-us.apache.org/repos/asf/spark/blob/c5b7e1f7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index 25f88d9..0a34611 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -64,6 +64,20 @@ abstract class OutputWriterFactory extends Serializable {
       bucketId: Option[Int], // TODO: This doesn't belong here...
       dataSchema: StructType,
       context: TaskAttemptContext): OutputWriter
+
+  /**
+   * Returns a new instance of [[OutputWriter]] that will write data to the 
given path.
+   * This method gets called by each task on executor to write 
[[InternalRow]]s to
+   * format-specific files. Compared to the other `newInstance()`, this is a 
newer API that
+   * passes only the path that the writer must write to. The writer must write 
to the exact path
+   * and not modify it (do not add subdirectories, extensions, etc.). All other
+   * file-format-specific information needed to create the writer must be 
passed
+   * through the [[OutputWriterFactory]] implementation.
+   * @since 2.0.0
+   */
+  private[sql] def newWriter(path: String): OutputWriter = {
+    throw new UnsupportedOperationException("newInstance with just path not 
supported")
+  }
 }
 
 /**
@@ -223,6 +237,20 @@ trait FileFormat {
     // Until then we guard in [[FileSourceStrategy]] to only call this method 
on supported formats.
     throw new UnsupportedOperationException(s"buildReader is not supported for 
$this")
   }
+
+  /**
+   * Returns a [[OutputWriterFactory]] for generating output writers that can 
write data.
+   * This method is current used only by FileStreamSinkWriter to generate 
output writers that
+   * does not use output committers to write data. The OutputWriter generated 
by the returned
+   * [[OutputWriterFactory]] must implement the method `newWriter(path)`..
+   */
+  def buildWriter(
+      sqlContext: SQLContext,
+      dataSchema: StructType,
+      options: Map[String, String]): OutputWriterFactory = {
+    // TODO: Remove this default implementation when the other formats have 
been ported
+    throw new UnsupportedOperationException(s"buildWriter is not supported for 
$this")
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/c5b7e1f7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index b1513bb..79185df 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -41,13 +41,13 @@ import org.apache.spark.SparkException
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.JoinedRow
+import org.apache.spark.sql.catalyst.expressions._
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.{AtomicType, DataType, StructType}
+import org.apache.spark.sql.types._
 import org.apache.spark.util.SerializableConfiguration
 
 private[sql] class DefaultSource
@@ -372,8 +372,120 @@ private[sql] class DefaultSource
       }
     }
   }
+
+  override def buildWriter(
+      sqlContext: SQLContext,
+      dataSchema: StructType,
+      options: Map[String, String]): OutputWriterFactory = {
+    new ParquetOutputWriterFactory(
+      sqlContext.conf,
+      dataSchema,
+      sqlContext.sparkContext.hadoopConfiguration,
+      options)
+  }
 }
 
+/**
+ * A factory for generating OutputWriters for writing parquet files. This 
implemented is different
+ * from the [[ParquetOutputWriter]] as this does not use any 
[[OutputCommitter]]. It simply
+ * writes the data to the path used to generate the output writer. Callers of 
this factory
+ * has to ensure which files are to be considered as committed.
+ */
+private[sql] class ParquetOutputWriterFactory(
+    sqlConf: SQLConf,
+    dataSchema: StructType,
+    hadoopConf: Configuration,
+    options: Map[String, String]) extends OutputWriterFactory {
+
+  private val serializableConf: SerializableConfiguration = {
+    val job = Job.getInstance(hadoopConf)
+    val conf = ContextUtil.getConfiguration(job)
+    val parquetOptions = new ParquetOptions(options, sqlConf)
+
+    // We're not really using `ParquetOutputFormat[Row]` for writing data 
here, because we override
+    // it in `ParquetOutputWriter` to support appending and dynamic 
partitioning.  The reason why
+    // we set it here is to setup the output committer class to 
`ParquetOutputCommitter`, which is
+    // bundled with `ParquetOutputFormat[Row]`.
+    job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]])
+
+    ParquetOutputFormat.setWriteSupportClass(job, 
classOf[CatalystWriteSupport])
+
+    // We want to clear this temporary metadata from saving into Parquet file.
+    // This metadata is only useful for detecting optional columns when 
pushdowning filters.
+    val dataSchemaToWrite = StructType.removeMetadata(
+      StructType.metadataKeyForOptionalField,
+      dataSchema).asInstanceOf[StructType]
+    CatalystWriteSupport.setSchema(dataSchemaToWrite, conf)
+
+    // Sets flags for `CatalystSchemaConverter` (which converts Catalyst 
schema to Parquet schema)
+    // and `CatalystWriteSupport` (writing actual rows to Parquet files).
+    conf.set(
+      SQLConf.PARQUET_BINARY_AS_STRING.key,
+      sqlConf.isParquetBinaryAsString.toString)
+
+    conf.set(
+      SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
+      sqlConf.isParquetINT96AsTimestamp.toString)
+
+    conf.set(
+      SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key,
+      sqlConf.writeLegacyParquetFormat.toString)
+
+    // Sets compression scheme
+    conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodec)
+    new SerializableConfiguration(conf)
+  }
+
+  /**
+   * Returns a [[OutputWriter]] that writes data to the give path without using
+   * [[OutputCommitter]].
+   */
+  override private[sql] def newWriter(path: String): OutputWriter = new 
OutputWriter {
+
+    // Create TaskAttemptContext that is used to pass on Configuration to the 
ParquetRecordWriter
+    private val hadoopTaskAttempId = new TaskAttemptID(new TaskID(new JobID, 
TaskType.MAP, 0), 0)
+    private val hadoopAttemptContext = new TaskAttemptContextImpl(
+      serializableConf.value, hadoopTaskAttempId)
+
+    // Instance of ParquetRecordWriter that does not use OutputCommitter
+    private val recordWriter = createNoCommitterRecordWriter(path, 
hadoopAttemptContext)
+
+    override def write(row: Row): Unit = {
+      throw new UnsupportedOperationException("call writeInternal")
+    }
+
+    protected[sql] override def writeInternal(row: InternalRow): Unit = {
+      recordWriter.write(null, row)
+    }
+
+    override def close(): Unit = recordWriter.close(hadoopAttemptContext)
+  }
+
+  /** Create a [[ParquetRecordWriter]] that writes the given path without 
using OutputCommitter */
+  private def createNoCommitterRecordWriter(
+      path: String,
+      hadoopAttemptContext: TaskAttemptContext): RecordWriter[Void, 
InternalRow] = {
+    // Custom ParquetOutputFormat that disable use of committer and writes to 
the given path
+    val outputFormat = new ParquetOutputFormat[InternalRow]() {
+      override def getOutputCommitter(c: TaskAttemptContext): OutputCommitter 
= { null }
+      override def getDefaultWorkFile(c: TaskAttemptContext, ext: String): 
Path = { new Path(path) }
+    }
+    outputFormat.getRecordWriter(hadoopAttemptContext)
+  }
+
+  /** Disable the use of the older API. */
+  def newInstance(
+      path: String,
+      bucketId: Option[Int],
+      dataSchema: StructType,
+      context: TaskAttemptContext): OutputWriter = {
+    throw new UnsupportedOperationException(
+      "this verison of newInstance not supported for " +
+        "ParquetOutputWriterFactory")
+  }
+}
+
+
 // NOTE: This class is instantiated and used on executor side only, no need to 
be serializable.
 private[sql] class ParquetOutputWriter(
     path: String,

http://git-wip-us.apache.org/repos/asf/spark/blob/c5b7e1f7/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index 70aea7f..e191010 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -19,11 +19,20 @@ package org.apache.spark.sql.execution.streaming
 
 import java.util.UUID
 
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
+import org.apache.spark.{SparkEnv, SparkException, TaskContext, 
TaskContextImpl}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, SparkSession}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.UnsafeKVExternalSorter
+import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, 
PartitioningUtils}
+import org.apache.spark.sql.types.{StringType, StructType}
+import org.apache.spark.util.SerializableConfiguration
 
 object FileStreamSink {
   // The name of the subdirectory that is used to store metadata about which 
files are valid.
@@ -40,28 +49,24 @@ object FileStreamSink {
 class FileStreamSink(
     sparkSession: SparkSession,
     path: String,
-    fileFormat: FileFormat) extends Sink with Logging {
+    fileFormat: FileFormat,
+    partitionColumnNames: Seq[String],
+    options: Map[String, String]) extends Sink with Logging {
 
   private val basePath = new Path(path)
   private val logPath = new Path(basePath, FileStreamSink.metadataDir)
   private val fileLog = new FileStreamSinkLog(sparkSession, 
logPath.toUri.toString)
-  private val fs = 
basePath.getFileSystem(sparkSession.sessionState.newHadoopConf())
+  private val hadoopConf = sparkSession.sessionState.newHadoopConf()
+  private val fs = basePath.getFileSystem(hadoopConf)
 
   override def addBatch(batchId: Long, data: DataFrame): Unit = {
     if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
       logInfo(s"Skipping already committed batch $batchId")
     } else {
-      val files = fs.listStatus(writeFiles(data)).map { f =>
-        SinkFileStatus(
-          path = f.getPath.toUri.toString,
-          size = f.getLen,
-          isDir = f.isDirectory,
-          modificationTime = f.getModificationTime,
-          blockReplication = f.getReplication,
-          blockSize = f.getBlockSize,
-          action = FileStreamSinkLog.ADD_ACTION)
-      }
-      if (fileLog.add(batchId, files)) {
+      val writer = new FileStreamSinkWriter(
+        data, fileFormat, path, partitionColumnNames, hadoopConf, options)
+      val fileStatuses = writer.write()
+      if (fileLog.add(batchId, fileStatuses)) {
         logInfo(s"Committed batch $batchId")
       } else {
         throw new IllegalStateException(s"Race while writing batch $batchId")
@@ -69,17 +74,192 @@ class FileStreamSink(
     }
   }
 
-  /** Writes the [[DataFrame]] to a UUID-named dir, returning the list of 
files paths. */
-  private def writeFiles(data: DataFrame): Array[Path] = {
-    val file = new Path(basePath, UUID.randomUUID().toString).toUri.toString
-    data.write.parquet(file)
-    sparkSession.read
-        .schema(data.schema)
-        .parquet(file)
-        .inputFiles
-        .map(new Path(_))
-        .filterNot(_.getName.startsWith("_"))
+  override def toString: String = s"FileSink[$path]"
+}
+
+
+/**
+ * Writes data given to a [[FileStreamSink]] to the given `basePath` in the 
given `fileFormat`,
+ * partitioned by the given `partitionColumnNames`. This writer always appends 
data to the
+ * directory if it already has data.
+ */
+class FileStreamSinkWriter(
+    data: DataFrame,
+    fileFormat: FileFormat,
+    basePath: String,
+    partitionColumnNames: Seq[String],
+    hadoopConf: Configuration,
+    options: Map[String, String]) extends Serializable with Logging {
+
+  PartitioningUtils.validatePartitionColumnDataTypes(
+    data.schema, partitionColumnNames, 
data.sqlContext.conf.caseSensitiveAnalysis)
+
+  private val serializableConf = new SerializableConfiguration(hadoopConf)
+  private val dataSchema = data.schema
+  private val dataColumns = data.logicalPlan.output
+
+  // Get the actual partition columns as attributes after matching them by 
name with
+  // the given columns names.
+  private val partitionColumns = partitionColumnNames.map { col =>
+    val nameEquality = if 
(data.sparkSession.sessionState.conf.caseSensitiveAnalysis) {
+      org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
+    } else {
+      org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
+    }
+    data.logicalPlan.output.find(f => nameEquality(f.name, col)).getOrElse {
+      throw new RuntimeException(s"Partition column $col not found in schema 
$dataSchema")
+    }
+  }
+
+  // Columns that are to be written to the files. If there are partitioning 
columns, then
+  // those will not be written to the files.
+  private val writeColumns = {
+    val partitionSet = AttributeSet(partitionColumns)
+    dataColumns.filterNot(partitionSet.contains)
   }
 
-  override def toString: String = s"FileSink[$path]"
+  // An OutputWriterFactory for generating writers in the executors for 
writing the files.
+  private val outputWriterFactory =
+    fileFormat.buildWriter(data.sqlContext, writeColumns.toStructType, options)
+
+  /** Expressions that given a partition key build a string like: 
col1=val/col2=val/... */
+  private def partitionStringExpression: Seq[Expression] = {
+    partitionColumns.zipWithIndex.flatMap { case (c, i) =>
+      val escaped =
+        ScalaUDF(
+          PartitioningUtils.escapePathName _,
+          StringType,
+          Seq(Cast(c, StringType)),
+          Seq(StringType))
+      val str = If(IsNull(c), 
Literal(PartitioningUtils.DEFAULT_PARTITION_NAME), escaped)
+      val partitionName = Literal(c.name + "=") :: str :: Nil
+      if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName
+    }
+  }
+
+  /** Generate a new output writer from the writer factory */
+  private def newOutputWriter(path: Path): OutputWriter = {
+    val newWriter = outputWriterFactory.newWriter(path.toString)
+    newWriter.initConverter(dataSchema)
+    newWriter
+  }
+
+  /** Write the dataframe to files. This gets called in the driver by the 
[[FileStreamSink]]. */
+  def write(): Array[SinkFileStatus] = {
+    data.sqlContext.sparkContext.runJob(
+      data.queryExecution.toRdd,
+      (taskContext: TaskContext, iterator: Iterator[InternalRow]) => {
+        if (partitionColumns.isEmpty) {
+          Seq(writePartitionToSingleFile(iterator))
+        } else {
+          writePartitionToPartitionedFiles(iterator)
+        }
+      }).flatten
+  }
+
+  /**
+   * Writes a RDD partition to a single file without dynamic partitioning.
+   * This gets called in the executor, and it uses a [[OutputWriter]] to write 
the data.
+   */
+  def writePartitionToSingleFile(iterator: Iterator[InternalRow]): 
SinkFileStatus = {
+    var writer: OutputWriter = null
+    try {
+      val path = new Path(basePath, UUID.randomUUID.toString)
+      val fs = path.getFileSystem(serializableConf.value)
+      writer = newOutputWriter(path)
+      while (iterator.hasNext) {
+        writer.writeInternal(iterator.next)
+      }
+      writer.close()
+      writer = null
+      SinkFileStatus(fs.getFileStatus(path))
+    } catch {
+      case cause: Throwable =>
+        logError("Aborting task.", cause)
+        // call failure callbacks first, so we could have a chance to cleanup 
the writer.
+        TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(cause)
+        throw new SparkException("Task failed while writing rows.", cause)
+    } finally {
+      if (writer != null) {
+        writer.close()
+      }
+    }
+  }
+
+  /**
+   * Writes a RDD partition to multiple dynamically partitioned files.
+   * This gets called in the executor. It first sorts the data based on the 
partitioning columns
+   * and then writes the data of each key to separate files using 
[[OutputWriter]]s.
+   */
+  def writePartitionToPartitionedFiles(iterator: Iterator[InternalRow]): 
Seq[SinkFileStatus] = {
+
+    // Returns the partitioning columns for sorting
+    val getSortingKey = UnsafeProjection.create(partitionColumns, dataColumns)
+
+    // Returns the data columns to be written given an input row
+    val getOutputRow = UnsafeProjection.create(writeColumns, dataColumns)
+
+    // Returns the partition path given a partition key
+    val getPartitionString =
+      UnsafeProjection.create(Concat(partitionStringExpression) :: Nil, 
partitionColumns)
+
+    // Sort the data before write, so that we only need one writer at the same 
time.
+    val sorter = new UnsafeKVExternalSorter(
+      partitionColumns.toStructType,
+      StructType.fromAttributes(writeColumns),
+      SparkEnv.get.blockManager,
+      SparkEnv.get.serializerManager,
+      TaskContext.get().taskMemoryManager().pageSizeBytes)
+
+    while (iterator.hasNext) {
+      val currentRow = iterator.next()
+      sorter.insertKV(getSortingKey(currentRow), getOutputRow(currentRow))
+    }
+    logDebug(s"Sorting complete. Writing out partition files one at a time.")
+
+    val sortedIterator = sorter.sortedIterator()
+    val paths = new ArrayBuffer[Path]
+
+    // Write the sorted data to partitioned files, one for each unique key
+    var currentWriter: OutputWriter = null
+    try {
+      var currentKey: UnsafeRow = null
+      while (sortedIterator.next()) {
+        val nextKey = sortedIterator.getKey
+
+        // If key changes, close current writer, and open a new writer to a 
new partitioned file
+        if (currentKey != nextKey) {
+          if (currentWriter != null) {
+            currentWriter.close()
+            currentWriter = null
+          }
+          currentKey = nextKey.copy()
+          val partitionPath = getPartitionString(currentKey).getString(0)
+          val path = new Path(new Path(basePath, partitionPath), 
UUID.randomUUID.toString)
+          paths += path
+          currentWriter = newOutputWriter(path)
+          logInfo(s"Writing partition $currentKey to $path")
+        }
+        currentWriter.writeInternal(sortedIterator.getValue)
+      }
+      if (currentWriter != null) {
+        currentWriter.close()
+        currentWriter = null
+      }
+      if (paths.nonEmpty) {
+        val fs = paths.head.getFileSystem(serializableConf.value)
+        paths.map(p => SinkFileStatus(fs.getFileStatus(p)))
+      } else Seq.empty
+    } catch {
+      case cause: Throwable =>
+        logError("Aborting task.", cause)
+        // call failure callbacks first, so we could have a chance to cleanup 
the writer.
+        TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(cause)
+        throw new SparkException("Task failed while writing rows.", cause)
+    } finally {
+      if (currentWriter != null) {
+        currentWriter.close()
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c5b7e1f7/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
index b694b61..4254df4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
@@ -54,6 +54,19 @@ case class SinkFileStatus(
   }
 }
 
+object SinkFileStatus {
+  def apply(f: FileStatus): SinkFileStatus = {
+    SinkFileStatus(
+      path = f.getPath.toUri.toString,
+      size = f.getLen,
+      isDir = f.isDirectory,
+      modificationTime = f.getModificationTime,
+      blockReplication = f.getReplication,
+      blockSize = f.getBlockSize,
+      action = FileStreamSinkLog.ADD_ACTION)
+  }
+}
+
 /**
  * A special log for [[FileStreamSink]]. It will write one log file for each 
batch. The first line
  * of the log file is the version number, and there are multiple JSON lines 
following. Each JSON

http://git-wip-us.apache.org/repos/asf/spark/blob/c5b7e1f7/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 9fe06a6..fca3d51 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -216,8 +216,9 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: 
SparkSession, path: String)
       new FileContextManager(metadataPath, hadoopConf)
     } catch {
       case e: UnsupportedFileSystemException =>
-        logWarning("Could not use FileContext API for managing metadata log 
file. The log may be" +
-          "inconsistent under failures.", e)
+        logWarning("Could not use FileContext API for managing metadata log 
files at path " +
+          s"$metadataPath. Using FileSystem API instead for managing log 
files. The log may be " +
+          s"inconsistent under failures.")
         new FileSystemManager(metadataPath, hadoopConf)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/c5b7e1f7/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 8cf5ded..609ca97 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -17,33 +17,223 @@
 
 package org.apache.spark.sql.streaming
 
-import org.apache.spark.sql.StreamTest
-import org.apache.spark.sql.execution.streaming.MemoryStream
+import java.io.File
+
+import org.apache.commons.io.FileUtils
+import org.apache.commons.io.filefilter.{DirectoryFileFilter, RegexFileFilter}
+
+import org.apache.spark.sql.{ContinuousQuery, Row, StreamTest}
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.execution.datasources.parquet
+import org.apache.spark.sql.execution.streaming.{FileStreamSinkWriter, 
MemoryStream}
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.util.Utils
 
 class FileStreamSinkSuite extends StreamTest with SharedSQLContext {
   import testImplicits._
 
-  test("unpartitioned writing") {
+
+  test("FileStreamSinkWriter - unpartitioned data") {
+    val path = Utils.createTempDir()
+    path.delete()
+
+    val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
+    val fileFormat = new parquet.DefaultSource()
+
+    def writeRange(start: Int, end: Int, numPartitions: Int): Seq[String] = {
+      val df = sqlContext
+        .range(start, end, 1, numPartitions)
+        .select($"id", lit(100).as("data"))
+      val writer = new FileStreamSinkWriter(
+        df, fileFormat, path.toString, partitionColumnNames = Nil, hadoopConf, 
Map.empty)
+      writer.write().map(_.path.stripPrefix("file://"))
+    }
+
+    // Write and check whether new files are written correctly
+    val files1 = writeRange(0, 10, 2)
+    assert(files1.size === 2, s"unexpected number of files: $files1")
+    checkFilesExist(path, files1, "file not written")
+    checkAnswer(sqlContext.read.load(path.getCanonicalPath), (0 until 
10).map(Row(_, 100)))
+
+    // Append and check whether new files are written correctly and old files 
still exist
+    val files2 = writeRange(10, 20, 3)
+    assert(files2.size === 3, s"unexpected number of files: $files2")
+    assert(files2.intersect(files1).isEmpty, "old files returned")
+    checkFilesExist(path, files2, s"New file not written")
+    checkFilesExist(path, files1, s"Old file not found")
+    checkAnswer(sqlContext.read.load(path.getCanonicalPath), (0 until 
20).map(Row(_, 100)))
+  }
+
+  test("FileStreamSinkWriter - partitioned data") {
+    implicit val e = ExpressionEncoder[java.lang.Long]
+    val path = Utils.createTempDir()
+    path.delete()
+
+    val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
+    val fileFormat = new parquet.DefaultSource()
+
+    def writeRange(start: Int, end: Int, numPartitions: Int): Seq[String] = {
+      val df = sqlContext
+        .range(start, end, 1, numPartitions)
+        .flatMap(x => Iterator(x, x, x)).toDF("id")
+        .select($"id", lit(100).as("data1"), lit(1000).as("data2"))
+
+      require(df.rdd.partitions.size === numPartitions)
+      val writer = new FileStreamSinkWriter(
+        df, fileFormat, path.toString, partitionColumnNames = Seq("id"), 
hadoopConf, Map.empty)
+      writer.write().map(_.path.stripPrefix("file://"))
+    }
+
+    def checkOneFileWrittenPerKey(keys: Seq[Int], filesWritten: Seq[String]): 
Unit = {
+      keys.foreach { id =>
+        assert(
+          filesWritten.count(_.contains(s"/id=$id/")) == 1,
+          s"no file for id=$id. all files: 
\n\t${filesWritten.mkString("\n\t")}"
+        )
+      }
+    }
+
+    // Write and check whether new files are written correctly
+    val files1 = writeRange(0, 10, 2)
+    assert(files1.size === 10, s"unexpected number of 
files:\n${files1.mkString("\n")}")
+    checkFilesExist(path, files1, "file not written")
+    checkOneFileWrittenPerKey(0 until 10, files1)
+
+    val answer1 = (0 until 10).flatMap(x => Iterator(x, x, x)).map(Row(100, 
1000, _))
+    checkAnswer(sqlContext.read.load(path.getCanonicalPath), answer1)
+
+    // Append and check whether new files are written correctly and old files 
still exist
+    val files2 = writeRange(0, 20, 3)
+    assert(files2.size === 20, s"unexpected number of 
files:\n${files2.mkString("\n")}")
+    assert(files2.intersect(files1).isEmpty, "old files returned")
+    checkFilesExist(path, files2, s"New file not written")
+    checkFilesExist(path, files1, s"Old file not found")
+    checkOneFileWrittenPerKey(0 until 20, files2)
+
+    val answer2 = (0 until 20).flatMap(x => Iterator(x, x, x)).map(Row(100, 
1000, _))
+    checkAnswer(sqlContext.read.load(path.getCanonicalPath), answer1 ++ 
answer2)
+  }
+
+  test("FileStreamSink - unpartitioned writing and batch reading") {
     val inputData = MemoryStream[Int]
     val df = inputData.toDF()
 
     val outputDir = Utils.createTempDir(namePrefix = 
"stream.output").getCanonicalPath
     val checkpointDir = Utils.createTempDir(namePrefix = 
"stream.checkpoint").getCanonicalPath
 
-    val query =
-      df.write
-        .format("parquet")
-        .option("checkpointLocation", checkpointDir)
-        .startStream(outputDir)
+    var query: ContinuousQuery = null
+
+    try {
+      query =
+        df.write
+          .format("parquet")
+          .option("checkpointLocation", checkpointDir)
+          .startStream(outputDir)
+
+      inputData.addData(1, 2, 3)
+
+      failAfter(streamingTimeout) {
+        query.processAllAvailable()
+      }
 
-    inputData.addData(1, 2, 3)
-    failAfter(streamingTimeout) { query.processAllAvailable() }
+      val outputDf = sqlContext.read.parquet(outputDir).as[Int]
+      checkDataset(outputDf, 1, 2, 3)
 
-    val outputDf = sqlContext.read.parquet(outputDir).as[Int]
-    checkDataset(
-      outputDf,
-      1, 2, 3)
+    } finally {
+      if (query != null) {
+        query.stop()
+      }
+    }
   }
+
+  test("FileStreamSink - partitioned writing and batch reading [IGNORES 
PARTITION COLUMN]") {
+    val inputData = MemoryStream[Int]
+    val ds = inputData.toDS()
+
+    val outputDir = Utils.createTempDir(namePrefix = 
"stream.output").getCanonicalPath
+    val checkpointDir = Utils.createTempDir(namePrefix = 
"stream.checkpoint").getCanonicalPath
+
+    var query: ContinuousQuery = null
+
+    try {
+       query =
+        ds.map(i => (i, i * 1000))
+          .toDF("id", "value")
+          .write
+          .format("parquet")
+          .partitionBy("id")
+          .option("checkpointLocation", checkpointDir)
+          .startStream(outputDir)
+
+      inputData.addData(1, 2, 3)
+      failAfter(streamingTimeout) {
+        query.processAllAvailable()
+      }
+
+      // TODO (tdas): Test partition column can be read or not
+      val outputDf = sqlContext.read.parquet(outputDir)
+      checkDataset(
+        outputDf.as[Int],
+        1000, 2000, 3000)
+
+    } finally {
+      if (query != null) {
+        query.stop()
+      }
+    }
+  }
+
+  test("FileStreamSink - supported formats") {
+    def testFormat(format: Option[String]): Unit = {
+      val inputData = MemoryStream[Int]
+      val ds = inputData.toDS()
+
+      val outputDir = Utils.createTempDir(namePrefix = 
"stream.output").getCanonicalPath
+      val checkpointDir = Utils.createTempDir(namePrefix = 
"stream.checkpoint").getCanonicalPath
+
+      var query: ContinuousQuery = null
+
+      try {
+        val writer =
+          ds.map(i => (i, i * 1000))
+            .toDF("id", "value")
+            .write
+        if (format.nonEmpty) {
+          writer.format(format.get)
+        }
+        query = writer
+            .option("checkpointLocation", checkpointDir)
+            .startStream(outputDir)
+      } finally {
+        if (query != null) {
+          query.stop()
+        }
+      }
+    }
+
+    testFormat(None) // should not throw error as default format parquet when 
not specified
+    testFormat(Some("parquet"))
+    val e = intercept[UnsupportedOperationException] {
+      testFormat(Some("text"))
+    }
+    Seq("text", "not support", "stream").foreach { s =>
+      assert(e.getMessage.contains(s))
+    }
+  }
+
+  private def checkFilesExist(dir: File, expectedFiles: Seq[String], msg: 
String): Unit = {
+    import scala.collection.JavaConverters._
+    val files =
+      FileUtils.listFiles(dir, new RegexFileFilter("[^.]+"), 
DirectoryFileFilter.DIRECTORY)
+        .asScala
+        .map(_.getCanonicalPath)
+        .toSet
+
+    expectedFiles.foreach { f =>
+      assert(files.contains(f),
+        s"\n$msg\nexpected file:\n\t$f\nfound 
files:\n${files.mkString("\n\t")}")
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c5b7e1f7/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
index 5b49a0a..50703e5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
@@ -41,7 +41,15 @@ import org.apache.spark.util.Utils
 class FileStressSuite extends StreamTest with SharedSQLContext {
   import testImplicits._
 
-  test("fault tolerance stress test") {
+  testQuietly("fault tolerance stress test - unpartitioned output") {
+    stressTest(partitionWrites = false)
+  }
+
+  testQuietly("fault tolerance stress test - partitioned output") {
+    stressTest(partitionWrites = true)
+  }
+
+  def stressTest(partitionWrites: Boolean): Unit = {
     val numRecords = 10000
     val inputDir = Utils.createTempDir(namePrefix = 
"stream.input").getCanonicalPath
     val stagingDir = Utils.createTempDir(namePrefix = 
"stream.staging").getCanonicalPath
@@ -93,18 +101,36 @@ class FileStressSuite extends StreamTest with 
SharedSQLContext {
     writer.start()
 
     val input = sqlContext.read.format("text").stream(inputDir)
-    def startStream(): ContinuousQuery = input
+
+    def startStream(): ContinuousQuery = {
+      val output = input
         .repartition(5)
         .as[String]
         .mapPartitions { iter =>
           val rand = Random.nextInt(100)
-          if (rand < 5) { sys.error("failure") }
+          if (rand < 10) {
+            sys.error("failure")
+          }
           iter.map(_.toLong)
         }
-        .write
-        .format("parquet")
-        .option("checkpointLocation", checkpoint)
-        .startStream(outputDir)
+        .map(x => (x % 400, x.toString))
+        .toDF("id", "data")
+
+      if (partitionWrites) {
+        output
+          .write
+          .partitionBy("id")
+          .format("parquet")
+          .option("checkpointLocation", checkpoint)
+          .startStream(outputDir)
+      } else {
+        output
+          .write
+          .format("parquet")
+          .option("checkpointLocation", checkpoint)
+          .startStream(outputDir)
+      }
+    }
 
     var failures = 0
     val streamThread = new Thread("stream runner") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to