Repository: spark
Updated Branches:
  refs/heads/master eb004c662 -> a6aade004


[SPARK-15698][SQL][STREAMING] Add the ability to remove the old MetadataLog in 
FileStreamSource

## What changes were proposed in this pull request?

Current `metadataLog` in `FileStreamSource` will add a checkpoint file in each 
batch but do not have the ability to remove/compact, which will lead to large 
number of small files when running for a long time. So here propose to compact 
the old logs into one file. This method is quite similar to `FileStreamSinkLog` 
but simpler.

## How was this patch tested?

Unit test added.

Author: jerryshao <ss...@hortonworks.com>

Closes #13513 from jerryshao/SPARK-15698.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6aade00
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6aade00
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6aade00

Branch: refs/heads/master
Commit: a6aade0042d9c065669f46d2dac40ec6ce361e63
Parents: eb004c6
Author: jerryshao <ss...@hortonworks.com>
Authored: Tue Sep 20 10:24:12 2016 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Tue Sep 20 10:24:12 2016 -0700

----------------------------------------------------------------------
 .../streaming/CompactibleFileStreamLog.scala    | 245 +++++++++++++++++++
 .../execution/streaming/FileStreamSink.scala    |   3 +-
 .../execution/streaming/FileStreamSinkLog.scala | 212 ++--------------
 .../execution/streaming/FileStreamSource.scala  |  20 +-
 .../streaming/FileStreamSourceLog.scala         | 132 ++++++++++
 .../streaming/MetadataLogFileCatalog.scala      |   3 +-
 .../org/apache/spark/sql/internal/SQLConf.scala |  23 +-
 .../streaming/FileStreamSinkLogSuite.scala      |  35 +--
 .../sql/streaming/FileStreamSourceSuite.scala   |  99 +++++++-
 9 files changed, 550 insertions(+), 222 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a6aade00/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
new file mode 100644
index 0000000..027b5bb
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.IOException
+import java.nio.charset.StandardCharsets.UTF_8
+
+import scala.reflect.ClassTag
+
+import org.apache.hadoop.fs.{Path, PathFilter}
+
+import org.apache.spark.sql.SparkSession
+
+/**
+ * An abstract class for compactible metadata logs. It will write one log file 
for each batch.
+ * The first line of the log file is the version number, and there are 
multiple serialized
+ * metadata lines following.
+ *
+ * As reading from many small files is usually pretty slow, also too many
+ * small files in one folder will mess the FS, [[CompactibleFileStreamLog]] 
will
+ * compact log files every 10 batches by default into a big file. When
+ * doing a compaction, it will read all old log files and merge them with the 
new batch.
+ */
+abstract class CompactibleFileStreamLog[T: ClassTag](
+    metadataLogVersion: String,
+    sparkSession: SparkSession,
+    path: String)
+  extends HDFSMetadataLog[Array[T]](sparkSession, path) {
+
+  import CompactibleFileStreamLog._
+
+  /**
+   * If we delete the old files after compaction at once, there is a race 
condition in S3: other
+   * processes may see the old files are deleted but still cannot see the 
compaction file using
+   * "list". The `allFiles` handles this by looking for the next compaction 
file directly, however,
+   * a live lock may happen if the compaction happens too frequently: one 
processing keeps deleting
+   * old files while another one keeps retrying. Setting a reasonable cleanup 
delay could avoid it.
+   */
+  protected def fileCleanupDelayMs: Long
+
+  protected def isDeletingExpiredLog: Boolean
+
+  protected def compactInterval: Int
+
+  /**
+   * Serialize the data into encoded string.
+   */
+  protected def serializeData(t: T): String
+
+  /**
+   * Deserialize the string into data object.
+   */
+  protected def deserializeData(encodedString: String): T
+
+  /**
+   * Filter out the obsolete logs.
+   */
+  def compactLogs(logs: Seq[T]): Seq[T]
+
+  override def batchIdToPath(batchId: Long): Path = {
+    if (isCompactionBatch(batchId, compactInterval)) {
+      new Path(metadataPath, s"$batchId$COMPACT_FILE_SUFFIX")
+    } else {
+      new Path(metadataPath, batchId.toString)
+    }
+  }
+
+  override def pathToBatchId(path: Path): Long = {
+    getBatchIdFromFileName(path.getName)
+  }
+
+  override def isBatchFile(path: Path): Boolean = {
+    try {
+      getBatchIdFromFileName(path.getName)
+      true
+    } catch {
+      case _: NumberFormatException => false
+    }
+  }
+
+  override def serialize(logData: Array[T]): Array[Byte] = {
+    (metadataLogVersion +: 
logData.map(serializeData)).mkString("\n").getBytes(UTF_8)
+  }
+
+  override def deserialize(bytes: Array[Byte]): Array[T] = {
+    val lines = new String(bytes, UTF_8).split("\n")
+    if (lines.length == 0) {
+      throw new IllegalStateException("Incomplete log file")
+    }
+    val version = lines(0)
+    if (version != metadataLogVersion) {
+      throw new IllegalStateException(s"Unknown log version: ${version}")
+    }
+    lines.slice(1, lines.length).map(deserializeData)
+  }
+
+  override def add(batchId: Long, logs: Array[T]): Boolean = {
+    if (isCompactionBatch(batchId, compactInterval)) {
+      compact(batchId, logs)
+    } else {
+      super.add(batchId, logs)
+    }
+  }
+
+  /**
+   * Compacts all logs before `batchId` plus the provided `logs`, and writes 
them into the
+   * corresponding `batchId` file. It will delete expired files as well if 
enabled.
+   */
+  private def compact(batchId: Long, logs: Array[T]): Boolean = {
+    val validBatches = getValidBatchesBeforeCompactionBatch(batchId, 
compactInterval)
+    val allLogs = validBatches.flatMap(batchId => super.get(batchId)).flatten 
++ logs
+    if (super.add(batchId, compactLogs(allLogs).toArray)) {
+      if (isDeletingExpiredLog) {
+        deleteExpiredLog(batchId)
+      }
+      true
+    } else {
+      // Return false as there is another writer.
+      false
+    }
+  }
+
+  /**
+   * Returns all files except the deleted ones.
+   */
+  def allFiles(): Array[T] = {
+    var latestId = getLatest().map(_._1).getOrElse(-1L)
+    // There is a race condition when `FileStreamSink` is deleting old files 
and `StreamFileCatalog`
+    // is calling this method. This loop will retry the reading to deal with 
the
+    // race condition.
+    while (true) {
+      if (latestId >= 0) {
+        try {
+          val logs =
+            getAllValidBatches(latestId, compactInterval).flatMap(id => 
super.get(id)).flatten
+          return compactLogs(logs).toArray
+        } catch {
+          case e: IOException =>
+            // Another process using `CompactibleFileStreamLog` may delete the 
batch files when
+            // `StreamFileCatalog` are reading. However, it only happens when 
a compaction is
+            // deleting old files. If so, let's try the next compaction batch 
and we should find it.
+            // Otherwise, this is a real IO issue and we should throw it.
+            latestId = nextCompactionBatchId(latestId, compactInterval)
+            super.get(latestId).getOrElse {
+              throw e
+            }
+        }
+      } else {
+        return Array.empty
+      }
+    }
+    Array.empty
+  }
+
+  /**
+   * Since all logs before `compactionBatchId` are compacted and written into 
the
+   * `compactionBatchId` log file, they can be removed. However, due to the 
eventual consistency of
+   * S3, the compaction file may not be seen by other processes at once. So we 
only delete files
+   * created `fileCleanupDelayMs` milliseconds ago.
+   */
+  private def deleteExpiredLog(compactionBatchId: Long): Unit = {
+    val expiredTime = System.currentTimeMillis() - fileCleanupDelayMs
+    fileManager.list(metadataPath, new PathFilter {
+      override def accept(path: Path): Boolean = {
+        try {
+          val batchId = getBatchIdFromFileName(path.getName)
+          batchId < compactionBatchId
+        } catch {
+          case _: NumberFormatException =>
+            false
+        }
+      }
+    }).foreach { f =>
+      if (f.getModificationTime <= expiredTime) {
+        fileManager.delete(f.getPath)
+      }
+    }
+  }
+}
+
+object CompactibleFileStreamLog {
+  val COMPACT_FILE_SUFFIX = ".compact"
+
+  def getBatchIdFromFileName(fileName: String): Long = {
+    fileName.stripSuffix(COMPACT_FILE_SUFFIX).toLong
+  }
+
+  /**
+   * Returns if this is a compaction batch. FileStreamSinkLog will compact old 
logs every
+   * `compactInterval` commits.
+   *
+   * E.g., if `compactInterval` is 3, then 2, 5, 8, ... are all compaction 
batches.
+   */
+  def isCompactionBatch(batchId: Long, compactInterval: Int): Boolean = {
+    (batchId + 1) % compactInterval == 0
+  }
+
+  /**
+   * Returns all valid batches before the specified `compactionBatchId`. They 
contain all logs we
+   * need to do a new compaction.
+   *
+   * E.g., if `compactInterval` is 3 and `compactionBatchId` is 5, this method 
should returns
+   * `Seq(2, 3, 4)` (Note: it includes the previous compaction batch 2).
+   */
+  def getValidBatchesBeforeCompactionBatch(
+      compactionBatchId: Long,
+      compactInterval: Int): Seq[Long] = {
+    assert(isCompactionBatch(compactionBatchId, compactInterval),
+      s"$compactionBatchId is not a compaction batch")
+    (math.max(0, compactionBatchId - compactInterval)) until compactionBatchId
+  }
+
+  /**
+   * Returns all necessary logs before `batchId` (inclusive). If `batchId` is 
a compaction, just
+   * return itself. Otherwise, it will find the previous compaction batch and 
return all batches
+   * between it and `batchId`.
+   */
+  def getAllValidBatches(batchId: Long, compactInterval: Long): Seq[Long] = {
+    assert(batchId >= 0)
+    val start = math.max(0, (batchId + 1) / compactInterval * compactInterval 
- 1)
+    start to batchId
+  }
+
+  /**
+   * Returns the next compaction batch id after `batchId`.
+   */
+  def nextCompactionBatchId(batchId: Long, compactInterval: Long): Long = {
+    (batchId + compactInterval + 1) / compactInterval * compactInterval - 1
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a6aade00/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index 0f7d958..02c5b85 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -56,7 +56,8 @@ class FileStreamSink(
 
   private val basePath = new Path(path)
   private val logPath = new Path(basePath, FileStreamSink.metadataDir)
-  private val fileLog = new FileStreamSinkLog(sparkSession, 
logPath.toUri.toString)
+  private val fileLog =
+    new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, 
logPath.toUri.toString)
   private val hadoopConf = sparkSession.sessionState.newHadoopConf()
   private val fs = basePath.getFileSystem(hadoopConf)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a6aade00/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
index 6f9f7c1..64f2f00 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
@@ -17,10 +17,7 @@
 
 package org.apache.spark.sql.execution.streaming
 
-import java.io.IOException
-import java.nio.charset.StandardCharsets.UTF_8
-
-import org.apache.hadoop.fs.{FileStatus, Path, PathFilter}
+import org.apache.hadoop.fs.{FileStatus, Path}
 import org.json4s.NoTypeHints
 import org.json4s.jackson.Serialization
 import org.json4s.jackson.Serialization.{read, write}
@@ -79,213 +76,46 @@ object SinkFileStatus {
  * When the reader uses `allFiles` to list all files, this method only returns 
the visible files
  * (drops the deleted files).
  */
-class FileStreamSinkLog(sparkSession: SparkSession, path: String)
-  extends HDFSMetadataLog[Array[SinkFileStatus]](sparkSession, path) {
-
-  import FileStreamSinkLog._
+class FileStreamSinkLog(
+    metadataLogVersion: String,
+    sparkSession: SparkSession,
+    path: String)
+  extends CompactibleFileStreamLog[SinkFileStatus](metadataLogVersion, 
sparkSession, path) {
 
   private implicit val formats = Serialization.formats(NoTypeHints)
 
-  /**
-   * If we delete the old files after compaction at once, there is a race 
condition in S3: other
-   * processes may see the old files are deleted but still cannot see the 
compaction file using
-   * "list". The `allFiles` handles this by looking for the next compaction 
file directly, however,
-   * a live lock may happen if the compaction happens too frequently: one 
processing keeps deleting
-   * old files while another one keeps retrying. Setting a reasonable cleanup 
delay could avoid it.
-   */
-  private val fileCleanupDelayMs = 
sparkSession.sessionState.conf.fileSinkLogCleanupDelay
+  protected override val fileCleanupDelayMs =
+    sparkSession.conf.get(SQLConf.FILE_SINK_LOG_CLEANUP_DELAY)
 
-  private val isDeletingExpiredLog = 
sparkSession.sessionState.conf.fileSinkLogDeletion
+  protected override val isDeletingExpiredLog =
+    sparkSession.conf.get(SQLConf.FILE_SINK_LOG_DELETION)
 
-  private val compactInterval = 
sparkSession.sessionState.conf.fileSinkLogCompatInterval
+  protected override val compactInterval =
+    sparkSession.conf.get(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL)
   require(compactInterval > 0,
     s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was 
$compactInterval) " +
       "to a positive value.")
 
-  override def batchIdToPath(batchId: Long): Path = {
-    if (isCompactionBatch(batchId, compactInterval)) {
-      new Path(metadataPath, s"$batchId$COMPACT_FILE_SUFFIX")
-    } else {
-      new Path(metadataPath, batchId.toString)
-    }
-  }
-
-  override def pathToBatchId(path: Path): Long = {
-    getBatchIdFromFileName(path.getName)
-  }
-
-  override def isBatchFile(path: Path): Boolean = {
-    try {
-      getBatchIdFromFileName(path.getName)
-      true
-    } catch {
-      case _: NumberFormatException => false
-    }
-  }
-
-  override def serialize(logData: Array[SinkFileStatus]): Array[Byte] = {
-    (VERSION +: logData.map(write(_))).mkString("\n").getBytes(UTF_8)
+  protected override def serializeData(data: SinkFileStatus): String = {
+    write(data)
   }
 
-  override def deserialize(bytes: Array[Byte]): Array[SinkFileStatus] = {
-    val lines = new String(bytes, UTF_8).split("\n")
-    if (lines.length == 0) {
-      throw new IllegalStateException("Incomplete log file")
-    }
-    val version = lines(0)
-    if (version != VERSION) {
-      throw new IllegalStateException(s"Unknown log version: ${version}")
-    }
-    lines.slice(1, lines.length).map(read[SinkFileStatus](_))
-  }
-
-  override def add(batchId: Long, logs: Array[SinkFileStatus]): Boolean = {
-    if (isCompactionBatch(batchId, compactInterval)) {
-      compact(batchId, logs)
-    } else {
-      super.add(batchId, logs)
-    }
+  protected override def deserializeData(encodedString: String): 
SinkFileStatus = {
+    read[SinkFileStatus](encodedString)
   }
 
-  /**
-   * Returns all files except the deleted ones.
-   */
-  def allFiles(): Array[SinkFileStatus] = {
-    var latestId = getLatest().map(_._1).getOrElse(-1L)
-    // There is a race condition when `FileStreamSink` is deleting old files 
and `StreamFileCatalog`
-    // is calling this method. This loop will retry the reading to deal with 
the
-    // race condition.
-    while (true) {
-      if (latestId >= 0) {
-        val startId = getAllValidBatches(latestId, compactInterval)(0)
-        try {
-          val logs = get(Some(startId), Some(latestId)).flatMap(_._2)
-          return compactLogs(logs).toArray
-        } catch {
-          case e: IOException =>
-            // Another process using `FileStreamSink` may delete the batch 
files when
-            // `StreamFileCatalog` are reading. However, it only happens when 
a compaction is
-            // deleting old files. If so, let's try the next compaction batch 
and we should find it.
-            // Otherwise, this is a real IO issue and we should throw it.
-            latestId = nextCompactionBatchId(latestId, compactInterval)
-            get(latestId).getOrElse {
-              throw e
-            }
-        }
-      } else {
-        return Array.empty
-      }
-    }
-    Array.empty
-  }
-
-  /**
-   * Compacts all logs before `batchId` plus the provided `logs`, and writes 
them into the
-   * corresponding `batchId` file. It will delete expired files as well if 
enabled.
-   */
-  private def compact(batchId: Long, logs: Seq[SinkFileStatus]): Boolean = {
-    val validBatches = getValidBatchesBeforeCompactionBatch(batchId, 
compactInterval)
-    val allLogs = validBatches.flatMap(batchId => get(batchId)).flatten ++ logs
-    if (super.add(batchId, compactLogs(allLogs).toArray)) {
-      if (isDeletingExpiredLog) {
-        deleteExpiredLog(batchId)
-      }
-      true
+  override def compactLogs(logs: Seq[SinkFileStatus]): Seq[SinkFileStatus] = {
+    val deletedFiles = logs.filter(_.action == 
FileStreamSinkLog.DELETE_ACTION).map(_.path).toSet
+    if (deletedFiles.isEmpty) {
+      logs
     } else {
-      // Return false as there is another writer.
-      false
-    }
-  }
-
-  /**
-   * Since all logs before `compactionBatchId` are compacted and written into 
the
-   * `compactionBatchId` log file, they can be removed. However, due to the 
eventual consistency of
-   * S3, the compaction file may not be seen by other processes at once. So we 
only delete files
-   * created `fileCleanupDelayMs` milliseconds ago.
-   */
-  private def deleteExpiredLog(compactionBatchId: Long): Unit = {
-    val expiredTime = System.currentTimeMillis() - fileCleanupDelayMs
-    fileManager.list(metadataPath, new PathFilter {
-      override def accept(path: Path): Boolean = {
-        try {
-          val batchId = getBatchIdFromFileName(path.getName)
-          batchId < compactionBatchId
-        } catch {
-          case _: NumberFormatException =>
-            false
-        }
-      }
-    }).foreach { f =>
-      if (f.getModificationTime <= expiredTime) {
-        fileManager.delete(f.getPath)
-      }
+      logs.filter(f => !deletedFiles.contains(f.path))
     }
   }
 }
 
 object FileStreamSinkLog {
   val VERSION = "v1"
-  val COMPACT_FILE_SUFFIX = ".compact"
   val DELETE_ACTION = "delete"
   val ADD_ACTION = "add"
-
-  def getBatchIdFromFileName(fileName: String): Long = {
-    fileName.stripSuffix(COMPACT_FILE_SUFFIX).toLong
-  }
-
-  /**
-   * Returns if this is a compaction batch. FileStreamSinkLog will compact old 
logs every
-   * `compactInterval` commits.
-   *
-   * E.g., if `compactInterval` is 3, then 2, 5, 8, ... are all compaction 
batches.
-   */
-  def isCompactionBatch(batchId: Long, compactInterval: Int): Boolean = {
-    (batchId + 1) % compactInterval == 0
-  }
-
-  /**
-   * Returns all valid batches before the specified `compactionBatchId`. They 
contain all logs we
-   * need to do a new compaction.
-   *
-   * E.g., if `compactInterval` is 3 and `compactionBatchId` is 5, this method 
should returns
-   * `Seq(2, 3, 4)` (Note: it includes the previous compaction batch 2).
-   */
-  def getValidBatchesBeforeCompactionBatch(
-      compactionBatchId: Long,
-      compactInterval: Int): Seq[Long] = {
-    assert(isCompactionBatch(compactionBatchId, compactInterval),
-      s"$compactionBatchId is not a compaction batch")
-    (math.max(0, compactionBatchId - compactInterval)) until compactionBatchId
-  }
-
-  /**
-   * Returns all necessary logs before `batchId` (inclusive). If `batchId` is 
a compaction, just
-   * return itself. Otherwise, it will find the previous compaction batch and 
return all batches
-   * between it and `batchId`.
-   */
-  def getAllValidBatches(batchId: Long, compactInterval: Long): Seq[Long] = {
-    assert(batchId >= 0)
-    val start = math.max(0, (batchId + 1) / compactInterval * compactInterval 
- 1)
-    start to batchId
-  }
-
-  /**
-   * Removes all deleted files from logs. It assumes once one file is deleted, 
it won't be added to
-   * the log in future.
-   */
-  def compactLogs(logs: Seq[SinkFileStatus]): Seq[SinkFileStatus] = {
-    val deletedFiles = logs.filter(_.action == DELETE_ACTION).map(_.path).toSet
-    if (deletedFiles.isEmpty) {
-      logs
-    } else {
-      logs.filter(f => !deletedFiles.contains(f.path))
-    }
-  }
-
-  /**
-   * Returns the next compaction batch id after `batchId`.
-   */
-  def nextCompactionBatchId(batchId: Long, compactInterval: Long): Long = {
-    (batchId + compactInterval + 1) / compactInterval * compactInterval - 1
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a6aade00/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 42fb454..0dc08b1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -29,8 +29,6 @@ import org.apache.spark.sql.types.StructType
 
 /**
  * A very simple source that reads files from the given directory as they 
appear.
- *
- * TODO: Clean up the metadata log files periodically.
  */
 class FileStreamSource(
     sparkSession: SparkSession,
@@ -49,8 +47,8 @@ class FileStreamSource(
     fs.makeQualified(new Path(path))  // can contains glob patterns
   }
 
-  private val metadataLog = new 
HDFSMetadataLog[Array[FileEntry]](sparkSession, metadataPath)
-
+  private val metadataLog =
+    new FileStreamSourceLog(FileStreamSourceLog.VERSION, sparkSession, 
metadataPath)
   private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)
 
   /** Maximum number of new files to be considered in each batch */
@@ -60,11 +58,10 @@ class FileStreamSource(
   // Visible for testing and debugging in production.
   val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs)
 
-  metadataLog.get(None, Some(maxBatchId)).foreach { case (batchId, entry) =>
-    entry.foreach(seenFiles.add)
-    // TODO: move purge call out of the loop once we truncate logs.
-    seenFiles.purge()
+  metadataLog.allFiles().foreach { entry =>
+    seenFiles.add(entry)
   }
+  seenFiles.purge()
 
   logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAge = 
${sourceOptions.maxFileAgeMs}")
 
@@ -98,7 +95,7 @@ class FileStreamSource(
 
     if (batchFiles.nonEmpty) {
       maxBatchId += 1
-      metadataLog.add(maxBatchId, batchFiles.toArray)
+      metadataLog.add(maxBatchId, batchFiles.map(_.copy(batchId = 
maxBatchId)).toArray)
       logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} 
new files")
     }
 
@@ -174,7 +171,10 @@ object FileStreamSource {
   /** Timestamp for file modification time, in ms since January 1, 1970 UTC. */
   type Timestamp = Long
 
-  case class FileEntry(path: String, timestamp: Timestamp) extends Serializable
+  val NOT_SET = -1L
+
+  case class FileEntry(path: String, timestamp: Timestamp, batchId: Long = 
NOT_SET)
+    extends Serializable
 
   /**
    * A custom hash map used to track the list of files seen. This map is not 
thread-safe.

http://git-wip-us.apache.org/repos/asf/spark/blob/a6aade00/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
new file mode 100644
index 0000000..8103309
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.{LinkedHashMap => JLinkedHashMap}
+import java.util.Map.Entry
+
+import scala.collection.mutable
+
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry
+import org.apache.spark.sql.internal.SQLConf
+
+class FileStreamSourceLog(
+    metadataLogVersion: String,
+    sparkSession: SparkSession,
+    path: String)
+  extends CompactibleFileStreamLog[FileEntry](metadataLogVersion, 
sparkSession, path) {
+
+  import CompactibleFileStreamLog._
+
+  // Configurations about metadata compaction
+  protected override val compactInterval =
+  sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL)
+  require(compactInterval > 0,
+    s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was 
$compactInterval) to a " +
+      s"positive value.")
+
+  protected override val fileCleanupDelayMs =
+    sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY)
+
+  protected override val isDeletingExpiredLog =
+    sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION)
+
+  private implicit val formats = Serialization.formats(NoTypeHints)
+
+  // A fixed size log entry cache to cache the file entries belong to the 
compaction batch. It is
+  // used to avoid scanning the compacted log file to retrieve it's own batch 
data.
+  private val cacheSize = compactInterval
+  private val fileEntryCache = new JLinkedHashMap[Long, Array[FileEntry]] {
+    override def removeEldestEntry(eldest: Entry[Long, Array[FileEntry]]): 
Boolean = {
+      size() > cacheSize
+    }
+  }
+
+  protected override def serializeData(data: FileEntry): String = {
+    Serialization.write(data)
+  }
+
+  protected override def deserializeData(encodedString: String): FileEntry = {
+    Serialization.read[FileEntry](encodedString)
+  }
+
+  def compactLogs(logs: Seq[FileEntry]): Seq[FileEntry] = {
+    logs
+  }
+
+  override def add(batchId: Long, logs: Array[FileEntry]): Boolean = {
+    if (super.add(batchId, logs)) {
+      if (isCompactionBatch(batchId, compactInterval)) {
+        fileEntryCache.put(batchId, logs)
+      }
+      true
+    } else {
+      false
+    }
+  }
+
+  override def get(startId: Option[Long], endId: Option[Long]): Array[(Long, 
Array[FileEntry])] = {
+    val startBatchId = startId.getOrElse(0L)
+    val endBatchId = getLatest().map(_._1).getOrElse(0L)
+
+    val (existedBatches, removedBatches) = (startBatchId to endBatchId).map { 
id =>
+      if (isCompactionBatch(id, compactInterval) && 
fileEntryCache.containsKey(id)) {
+        (id, Some(fileEntryCache.get(id)))
+      } else {
+        val logs = super.get(id).map(_.filter(_.batchId == id))
+        (id, logs)
+      }
+    }.partition(_._2.isDefined)
+
+    // The below code may only be happened when original metadata log file has 
been removed, so we
+    // have to get the batch from latest compacted log file. This is quite 
time-consuming and may
+    // not be happened in the current FileStreamSource code path, since we 
only fetch the
+    // latest metadata log file.
+    val searchKeys = removedBatches.map(_._1)
+    val retrievedBatches = if (searchKeys.nonEmpty) {
+      logWarning(s"Get batches from removed files, this is unexpected in the 
current code path!!!")
+      val latestBatchId = getLatest().map(_._1).getOrElse(-1L)
+      if (latestBatchId < 0) {
+        Map.empty[Long, Option[Array[FileEntry]]]
+      } else {
+        val latestCompactedBatchId = getAllValidBatches(latestBatchId, 
compactInterval)(0)
+        val allLogs = new mutable.HashMap[Long, mutable.ArrayBuffer[FileEntry]]
+
+        super.get(latestCompactedBatchId).foreach { entries =>
+          entries.foreach { e =>
+            allLogs.put(e.batchId, allLogs.getOrElse(e.batchId, 
mutable.ArrayBuffer()) += e)
+          }
+        }
+
+        searchKeys.map(id => id -> 
allLogs.get(id).map(_.toArray)).filter(_._2.isDefined).toMap
+      }
+    } else {
+      Map.empty[Long, Option[Array[FileEntry]]]
+    }
+
+    (existedBatches ++ retrievedBatches).map(i => i._1 -> 
i._2.get).toArray.sortBy(_._1)
+  }
+}
+
+object FileStreamSourceLog {
+  val VERSION = "v1"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a6aade00/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
index 20ade12..a32c467 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
@@ -34,7 +34,8 @@ class MetadataLogFileCatalog(sparkSession: SparkSession, 
path: Path)
 
   private val metadataDirectory = new Path(path, FileStreamSink.metadataDir)
   logInfo(s"Reading streaming file log from $metadataDirectory")
-  private val metadataLog = new FileStreamSinkLog(sparkSession, 
metadataDirectory.toUri.toString)
+  private val metadataLog =
+    new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, 
metadataDirectory.toUri.toString)
   private val allFilesFromLog = 
metadataLog.allFiles().map(_.toFileStatus).filterNot(_.isDirectory)
   private var cachedPartitionSpec: PartitionSpec = _
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a6aade00/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 428032b..f8b7a7f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -544,7 +544,28 @@ object SQLConf {
       .internal()
       .doc("How long that a file is guaranteed to be visible for all readers.")
       .timeConf(TimeUnit.MILLISECONDS)
-      .createWithDefault(60 * 1000L) // 10 minutes
+      .createWithDefault(TimeUnit.MINUTES.toMillis(10)) // 10 minutes
+
+  val FILE_SOURCE_LOG_DELETION = 
SQLConfigBuilder("spark.sql.streaming.fileSource.log.deletion")
+    .internal()
+    .doc("Whether to delete the expired log files in file stream source.")
+    .booleanConf
+    .createWithDefault(true)
+
+  val FILE_SOURCE_LOG_COMPACT_INTERVAL =
+    SQLConfigBuilder("spark.sql.streaming.fileSource.log.compactInterval")
+      .internal()
+      .doc("Number of log files after which all the previous files " +
+        "are compacted into the next log file.")
+      .intConf
+      .createWithDefault(10)
+
+  val FILE_SOURCE_LOG_CLEANUP_DELAY =
+    SQLConfigBuilder("spark.sql.streaming.fileSource.log.cleanupDelay")
+      .internal()
+      .doc("How long in milliseconds a file is guaranteed to be visible for 
all readers.")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefault(TimeUnit.MINUTES.toMillis(10)) // 10 minutes
 
   val STREAMING_SCHEMA_INFERENCE =
     SQLConfigBuilder("spark.sql.streaming.schemaInference")

http://git-wip-us.apache.org/repos/asf/spark/blob/a6aade00/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
index 26f8b98..41a8cc2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
@@ -25,13 +25,14 @@ import org.apache.spark.sql.test.SharedSQLContext
 
 class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
 
+  import CompactibleFileStreamLog._
   import FileStreamSinkLog._
 
   test("getBatchIdFromFileName") {
     assert(1234L === getBatchIdFromFileName("1234"))
     assert(1234L === getBatchIdFromFileName("1234.compact"))
     intercept[NumberFormatException] {
-      FileStreamSinkLog.getBatchIdFromFileName("1234a")
+      getBatchIdFromFileName("1234a")
     }
   }
 
@@ -83,17 +84,19 @@ class FileStreamSinkLogSuite extends SparkFunSuite with 
SharedSQLContext {
   }
 
   test("compactLogs") {
-    val logs = Seq(
-      newFakeSinkFileStatus("/a/b/x", FileStreamSinkLog.ADD_ACTION),
-      newFakeSinkFileStatus("/a/b/y", FileStreamSinkLog.ADD_ACTION),
-      newFakeSinkFileStatus("/a/b/z", FileStreamSinkLog.ADD_ACTION))
-    assert(logs === compactLogs(logs))
+    withFileStreamSinkLog { sinkLog =>
+      val logs = Seq(
+        newFakeSinkFileStatus("/a/b/x", FileStreamSinkLog.ADD_ACTION),
+        newFakeSinkFileStatus("/a/b/y", FileStreamSinkLog.ADD_ACTION),
+        newFakeSinkFileStatus("/a/b/z", FileStreamSinkLog.ADD_ACTION))
+      assert(logs === sinkLog.compactLogs(logs))
 
-    val logs2 = Seq(
-      newFakeSinkFileStatus("/a/b/m", FileStreamSinkLog.ADD_ACTION),
-      newFakeSinkFileStatus("/a/b/n", FileStreamSinkLog.ADD_ACTION),
-      newFakeSinkFileStatus("/a/b/z", FileStreamSinkLog.DELETE_ACTION))
-    assert(logs.dropRight(1) ++ logs2.dropRight(1) === compactLogs(logs ++ 
logs2))
+      val logs2 = Seq(
+        newFakeSinkFileStatus("/a/b/m", FileStreamSinkLog.ADD_ACTION),
+        newFakeSinkFileStatus("/a/b/n", FileStreamSinkLog.ADD_ACTION),
+        newFakeSinkFileStatus("/a/b/z", FileStreamSinkLog.DELETE_ACTION))
+      assert(logs.dropRight(1) ++ logs2.dropRight(1) === 
sinkLog.compactLogs(logs ++ logs2))
+    }
   }
 
   test("serialize") {
@@ -125,21 +128,21 @@ class FileStreamSinkLogSuite extends SparkFunSuite with 
SharedSQLContext {
           action = FileStreamSinkLog.ADD_ACTION))
 
       // scalastyle:off
-      val expected = s"""${FileStreamSinkLog.VERSION}
+      val expected = s"""$VERSION
           
|{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add"}
           
|{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete"}
           
|{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add"}""".stripMargin
       // scalastyle:on
       assert(expected === new String(sinkLog.serialize(logs), UTF_8))
 
-      assert(FileStreamSinkLog.VERSION === new 
String(sinkLog.serialize(Array()), UTF_8))
+      assert(VERSION === new String(sinkLog.serialize(Array()), UTF_8))
     }
   }
 
   test("deserialize") {
     withFileStreamSinkLog { sinkLog =>
       // scalastyle:off
-      val logs = s"""${FileStreamSinkLog.VERSION}
+      val logs = s"""$VERSION
           
|{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add"}
           
|{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete"}
           
|{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add"}""".stripMargin
@@ -173,7 +176,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with 
SharedSQLContext {
 
       assert(expected === sinkLog.deserialize(logs.getBytes(UTF_8)))
 
-      assert(Nil === 
sinkLog.deserialize(FileStreamSinkLog.VERSION.getBytes(UTF_8)))
+      assert(Nil === sinkLog.deserialize(VERSION.getBytes(UTF_8)))
     }
   }
 
@@ -263,7 +266,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with 
SharedSQLContext {
 
   private def withFileStreamSinkLog(f: FileStreamSinkLog => Unit): Unit = {
     withTempDir { file =>
-      val sinkLog = new FileStreamSinkLog(spark, file.getCanonicalPath)
+      val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, 
file.getCanonicalPath)
       f(sinkLog)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a6aade00/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index a02a36c..55c95ae 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming
 
 import java.io.File
 
-import org.scalatest.concurrent.Eventually._
+import org.scalatest.PrivateMethodTester
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.sql._
@@ -30,7 +30,7 @@ import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
-class FileStreamSourceTest extends StreamTest with SharedSQLContext {
+class FileStreamSourceTest extends StreamTest with SharedSQLContext with 
PrivateMethodTester {
 
   import testImplicits._
 
@@ -804,6 +804,101 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
       )
     }
   }
+
+  test("compacat metadata log") {
+    val _sources = PrivateMethod[Seq[Source]]('sources)
+    val _metadataLog = PrivateMethod[FileStreamSourceLog]('metadataLog)
+
+    def verify(execution: StreamExecution)
+      (batchId: Long, expectedBatches: Int): Boolean = {
+      import CompactibleFileStreamLog._
+
+      val fileSource = (execution invokePrivate 
_sources()).head.asInstanceOf[FileStreamSource]
+      val metadataLog = fileSource invokePrivate _metadataLog()
+
+      if (isCompactionBatch(batchId, 2)) {
+        val path = metadataLog.batchIdToPath(batchId)
+
+        // Assert path name should be ended with compact suffix.
+        assert(path.getName.endsWith(COMPACT_FILE_SUFFIX))
+
+        // Compacted batch should include all entries from start.
+        val entries = metadataLog.get(batchId)
+        assert(entries.isDefined)
+        assert(entries.get.length === metadataLog.allFiles().length)
+        assert(metadataLog.get(None, Some(batchId)).flatMap(_._2).length === 
entries.get.length)
+      }
+
+      assert(metadataLog.allFiles().sortBy(_.batchId) ===
+        metadataLog.get(None, Some(batchId)).flatMap(_._2).sortBy(_.batchId))
+
+      metadataLog.get(None, Some(batchId)).flatMap(_._2).length === 
expectedBatches
+    }
+
+    withTempDirs { case (src, tmp) =>
+      withSQLConf(
+        SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2"
+      ) {
+        val fileStream = createFileStream("text", src.getCanonicalPath)
+        val filtered = fileStream.filter($"value" contains "keep")
+
+        testStream(filtered)(
+          AddTextFileData("drop1\nkeep2\nkeep3", src, tmp),
+          CheckAnswer("keep2", "keep3"),
+          AssertOnQuery(verify(_)(0L, 1)),
+          AddTextFileData("drop4\nkeep5\nkeep6", src, tmp),
+          CheckAnswer("keep2", "keep3", "keep5", "keep6"),
+          AssertOnQuery(verify(_)(1L, 2)),
+          AddTextFileData("drop7\nkeep8\nkeep9", src, tmp),
+          CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9"),
+          AssertOnQuery(verify(_)(2L, 3)),
+          StopStream,
+          StartStream(),
+          AssertOnQuery(verify(_)(2L, 3)),
+          AddTextFileData("drop10\nkeep11", src, tmp),
+          CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9", 
"keep11"),
+          AssertOnQuery(verify(_)(3L, 4)),
+          AddTextFileData("drop12\nkeep13", src, tmp),
+          CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9", 
"keep11", "keep13"),
+          AssertOnQuery(verify(_)(4L, 5))
+        )
+      }
+    }
+  }
+
+  test("get arbitrary batch from FileStreamSource") {
+    withTempDirs { case (src, tmp) =>
+      withSQLConf(
+        SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
+        // Force deleting the old logs
+        SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1"
+      ) {
+        val fileStream = createFileStream("text", src.getCanonicalPath)
+        val filtered = fileStream.filter($"value" contains "keep")
+
+        testStream(filtered)(
+          AddTextFileData("keep1", src, tmp),
+          CheckAnswer("keep1"),
+          AddTextFileData("keep2", src, tmp),
+          CheckAnswer("keep1", "keep2"),
+          AddTextFileData("keep3", src, tmp),
+          CheckAnswer("keep1", "keep2", "keep3"),
+          AssertOnQuery("check getBatch") { execution: StreamExecution =>
+            val _sources = PrivateMethod[Seq[Source]]('sources)
+            val fileSource =
+              (execution invokePrivate 
_sources()).head.asInstanceOf[FileStreamSource]
+            assert(fileSource.getBatch(None, 
LongOffset(2)).as[String].collect() ===
+              List("keep1", "keep2", "keep3"))
+            assert(fileSource.getBatch(Some(LongOffset(0)), 
LongOffset(2)).as[String].collect() ===
+              List("keep2", "keep3"))
+            assert(fileSource.getBatch(Some(LongOffset(1)), 
LongOffset(2)).as[String].collect() ===
+              List("keep3"))
+            true
+          }
+        )
+      }
+    }
+  }
 }
 
 class FileStreamSourceStressTestSuite extends FileStreamSourceTest {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to