This is an automated email from the ASF dual-hosted git repository.

brkyvz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 940510c  [SPARK-30669][SS] Introduce AdmissionControl APIs for 
StructuredStreaming
940510c is described below

commit 940510cb1e43a4166d2fe7d7eb4ace8561d24c9b
Author: Burak Yavuz <brk...@gmail.com>
AuthorDate: Thu Jan 30 22:01:53 2020 -0800

    [SPARK-30669][SS] Introduce AdmissionControl APIs for StructuredStreaming
    
    ### What changes were proposed in this pull request?
    
    We propose to add a new interface `SupportsAdmissionControl` and 
`ReadLimit`. A ReadLimit defines how much data should be read in the next 
micro-batch. `SupportsAdmissionControl` specifies that a source can rate limit 
its ingest into the system. The source can tell the system what the user 
specified as a read limit, and the system can enforce this limit within each 
micro-batch or impose its own limit if the Trigger is Trigger.Once() for 
example.
    
    We then use this interface in FileStreamSource, KafkaSource, and 
KafkaMicroBatchStream.
    
    ### Why are the changes needed?
    
    Sources currently have no information around execution semantics such as 
whether the stream is being executed in Trigger.Once() mode. This interface 
will pass this information into the sources as part of planning. With a trigger 
like Trigger.Once(), the semantics are to process all the data available to the 
datasource in a single micro-batch. However, this semantic can be broken when 
data source options such as `maxOffsetsPerTrigger` (in the Kafka source) rate 
limit the amount of data [...]
    
    ### Does this PR introduce any user-facing change?
    
    DataSource developers can extend this interface for their streaming sources 
to add admission control into their system and correctly support Trigger.Once().
    
    ### How was this patch tested?
    
    Existing tests, as this API is mostly internal
    
    Closes #27380 from brkyvz/rateLimit.
    
    Lead-authored-by: Burak Yavuz <brk...@gmail.com>
    Co-authored-by: Burak Yavuz <bu...@databricks.com>
    Signed-off-by: Burak Yavuz <brk...@gmail.com>
---
 .../spark/sql/kafka010/KafkaMicroBatchStream.scala | 25 ++++++----
 .../apache/spark/sql/kafka010/KafkaSource.scala    | 29 +++++++----
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  | 22 +++++++++
 .../read/streaming/ReadAllAvailable.java}          | 28 +++++++----
 .../sql/connector/read/streaming/ReadLimit.java}   | 25 ++++++----
 .../sql/connector/read/streaming/ReadMaxFiles.java | 55 +++++++++++++++++++++
 .../sql/connector/read/streaming/ReadMaxRows.java  | 55 +++++++++++++++++++++
 .../read/streaming/SupportsAdmissionControl.java   | 56 ++++++++++++++++++++++
 .../sql/execution/streaming/FileStreamSource.scala | 25 ++++++++--
 .../execution/streaming/MicroBatchExecution.scala  | 49 +++++++++++++------
 .../sql/execution/streaming/StreamExecution.scala  |  6 +--
 .../streaming/continuous/ContinuousExecution.scala |  4 +-
 .../sql/streaming/FileStreamSourceSuite.scala      | 56 ++++++++++++++++++++++
 13 files changed, 376 insertions(+), 59 deletions(-)

diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
index 844c963..6599e7e 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
@@ -27,8 +27,7 @@ import 
org.apache.spark.internal.config.Network.NETWORK_TIMEOUT
 import org.apache.spark.scheduler.ExecutorCacheTaskLocation
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connector.read.{InputPartition, 
PartitionReaderFactory}
-import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset}
-import 
org.apache.spark.sql.execution.streaming.sources.RateControlMicroBatchStream
+import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, 
Offset, ReadAllAvailable, ReadLimit, ReadMaxRows, SupportsAdmissionControl}
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.util.UninterruptibleThread
@@ -55,7 +54,7 @@ private[kafka010] class KafkaMicroBatchStream(
     options: CaseInsensitiveStringMap,
     metadataPath: String,
     startingOffsets: KafkaOffsetRangeLimit,
-    failOnDataLoss: Boolean) extends RateControlMicroBatchStream with Logging {
+    failOnDataLoss: Boolean) extends SupportsAdmissionControl with 
MicroBatchStream with Logging {
 
   private[kafka010] val pollTimeoutMs = options.getLong(
     KafkaSourceProvider.CONSUMER_POLL_TIMEOUT,
@@ -77,13 +76,23 @@ private[kafka010] class KafkaMicroBatchStream(
     KafkaSourceOffset(getOrCreateInitialPartitionOffsets())
   }
 
-  override def latestOffset(start: Offset): Offset = {
+  override def getDefaultReadLimit: ReadLimit = {
+    
maxOffsetsPerTrigger.map(ReadLimit.maxRows).getOrElse(super.getDefaultReadLimit)
+  }
+
+  override def latestOffset(): Offset = {
+    throw new UnsupportedOperationException(
+      "latestOffset(Offset, ReadLimit) should be called instead of this 
method")
+  }
+
+  override def latestOffset(start: Offset, readLimit: ReadLimit): Offset = {
     val startPartitionOffsets = 
start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
     val latestPartitionOffsets = 
kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
-    endPartitionOffsets = KafkaSourceOffset(maxOffsetsPerTrigger.map { 
maxOffsets =>
-      rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
-    }.getOrElse {
-      latestPartitionOffsets
+    endPartitionOffsets = KafkaSourceOffset(readLimit match {
+      case rows: ReadMaxRows =>
+        rateLimit(rows.maxRows(), startPartitionOffsets, 
latestPartitionOffsets)
+      case _: ReadAllAvailable =>
+        latestPartitionOffsets
     })
     endPartitionOffsets
   }
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index f0b3bf1..57879c7 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -32,6 +32,8 @@ import org.apache.spark.scheduler.ExecutorCacheTaskLocation
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.connector.read.streaming
+import org.apache.spark.sql.connector.read.streaming.{ReadAllAvailable, 
ReadLimit, ReadMaxRows, SupportsAdmissionControl}
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.kafka010.KafkaSource._
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
@@ -79,7 +81,7 @@ private[kafka010] class KafkaSource(
     metadataPath: String,
     startingOffsets: KafkaOffsetRangeLimit,
     failOnDataLoss: Boolean)
-  extends Source with Logging {
+  extends SupportsAdmissionControl with Source with Logging {
 
   private val sc = sqlContext.sparkContext
 
@@ -114,6 +116,10 @@ private[kafka010] class KafkaSource(
     }.partitionToOffsets
   }
 
+  override def getDefaultReadLimit: ReadLimit = {
+    
maxOffsetsPerTrigger.map(ReadLimit.maxRows).getOrElse(super.getDefaultReadLimit)
+  }
+
   private var currentPartitionOffsets: Option[Map[TopicPartition, Long]] = None
 
   private val converter = new KafkaRecordToRowConverter()
@@ -122,23 +128,30 @@ private[kafka010] class KafkaSource(
 
   /** Returns the maximum available offset for this source. */
   override def getOffset: Option[Offset] = {
+    throw new UnsupportedOperationException(
+      "latestOffset(Offset, ReadLimit) should be called instead of this 
method")
+  }
+
+  override def latestOffset(startOffset: streaming.Offset, limit: ReadLimit): 
streaming.Offset = {
     // Make sure initialPartitionOffsets is initialized
     initialPartitionOffsets
 
     val latest = kafkaReader.fetchLatestOffsets(
       currentPartitionOffsets.orElse(Some(initialPartitionOffsets)))
-    val offsets = maxOffsetsPerTrigger match {
-      case None =>
+    val offsets = limit match {
+      case rows: ReadMaxRows =>
+        if (currentPartitionOffsets.isEmpty) {
+          rateLimit(rows.maxRows(), initialPartitionOffsets, latest)
+        } else {
+          rateLimit(rows.maxRows(), currentPartitionOffsets.get, latest)
+        }
+      case _: ReadAllAvailable =>
         latest
-      case Some(limit) if currentPartitionOffsets.isEmpty =>
-        rateLimit(limit, initialPartitionOffsets, latest)
-      case Some(limit) =>
-        rateLimit(limit, currentPartitionOffsets.get, latest)
     }
 
     currentPartitionOffsets = Some(offsets)
     logDebug(s"GetOffset: ${offsets.toSeq.map(_.toString).sorted}")
-    Some(KafkaSourceOffset(offsets))
+    KafkaSourceOffset(offsets)
   }
 
   /** Proportionally distribute limit number of offsets among topicpartitions 
*/
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 468b21c..a4601b9 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -297,6 +297,28 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
         13, 126, 127, 128, 129, 130, 131, 132, 133, 134
       )
     )
+
+    // When Trigger.Once() is used, the read limit should be ignored
+    val allData = Seq(1) ++ (10 to 20) ++ (100 to 200)
+    withTempDir { dir =>
+      testStream(mapped)(
+        StartStream(Trigger.Once(), checkpointLocation = dir.getCanonicalPath),
+        AssertOnQuery { q =>
+          q.processAllAvailable()
+          true
+        },
+        CheckAnswer(allData: _*),
+        StopStream,
+
+        AddKafkaData(Set(topic), 1000 to 1010: _*),
+        StartStream(Trigger.Once(), checkpointLocation = dir.getCanonicalPath),
+        AssertOnQuery { q =>
+          q.processAllAvailable()
+          true
+        },
+        CheckAnswer((allData ++ 1000.to(1010)): _*)
+      )
+    }
   }
 
   test("input row metrics") {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateControlMicroBatchStream.scala
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadAllAvailable.java
similarity index 51%
copy from 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateControlMicroBatchStream.scala
copy to 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadAllAvailable.java
index fb46f76..5a946ad 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateControlMicroBatchStream.scala
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadAllAvailable.java
@@ -15,17 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.streaming.sources
+package org.apache.spark.sql.connector.read.streaming;
 
-import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset}
+import org.apache.spark.annotation.Evolving;
 
-// A special `MicroBatchStream` that can get latestOffset with a start offset.
-trait RateControlMicroBatchStream extends MicroBatchStream {
+/**
+ * Represents a {@link ReadLimit} where the {@link MicroBatchStream} must scan 
all the data
+ * available at the streaming source. This is meant to be a hard specification 
as being able
+ * to return all available data is necessary for Trigger.Once() to work 
correctly.
+ * If a source is unable to scan all available data, then it must throw an 
error.
+ *
+ * @see SupportsAdmissionControl#latestOffset(Offset, ReadLimit)
+ * @since 3.0.0
+ */
+@Evolving
+public final class ReadAllAvailable implements ReadLimit {
+  static final ReadAllAvailable INSTANCE = new ReadAllAvailable();
 
-  override def latestOffset(): Offset = {
-    throw new IllegalAccessException(
-      "latestOffset should not be called for RateControlMicroBatchReadSupport")
-  }
+  private ReadAllAvailable() {}
 
-  def latestOffset(start: Offset): Offset
+  @Override
+  public String toString() {
+    return "All Available";
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateControlMicroBatchStream.scala
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadLimit.java
similarity index 53%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateControlMicroBatchStream.scala
rename to 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadLimit.java
index fb46f76..121ed1a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateControlMicroBatchStream.scala
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadLimit.java
@@ -15,17 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.streaming.sources
+package org.apache.spark.sql.connector.read.streaming;
 
-import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset}
+import org.apache.spark.annotation.Evolving;
 
-// A special `MicroBatchStream` that can get latestOffset with a start offset.
-trait RateControlMicroBatchStream extends MicroBatchStream {
+/**
+ * Interface representing limits on how much to read from a {@link 
MicroBatchStream} when it
+ * implements {@link SupportsAdmissionControl}. There are several child 
interfaces representing
+ * various kinds of limits.
+ *
+ * @see SupportsAdmissionControl#latestOffset(Offset, ReadLimit)
+ * @see ReadAllAvailable
+ * @see ReadMaxRows
+ */
+@Evolving
+public interface ReadLimit {
+  static ReadLimit maxRows(long rows) { return new ReadMaxRows(rows); }
 
-  override def latestOffset(): Offset = {
-    throw new IllegalAccessException(
-      "latestOffset should not be called for RateControlMicroBatchReadSupport")
-  }
+  static ReadLimit maxFiles(int files) { return new ReadMaxFiles(files); }
 
-  def latestOffset(start: Offset): Offset
+  static ReadLimit allAvailable() { return ReadAllAvailable.INSTANCE; }
 }
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadMaxFiles.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadMaxFiles.java
new file mode 100644
index 0000000..441a6c8
--- /dev/null
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadMaxFiles.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read.streaming;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * Represents a {@link ReadLimit} where the {@link MicroBatchStream} should 
scan approximately the
+ * given maximum number of files.
+ *
+ * @see SupportsAdmissionControl#latestOffset(Offset, ReadLimit)
+ * @since 3.0.0
+ */
+@Evolving
+public class ReadMaxFiles implements ReadLimit {
+  private int files;
+
+  ReadMaxFiles(int maxFiles) {
+    this.files = maxFiles;
+  }
+
+  /** Approximate maximum rows to scan. */
+  public int maxFiles() { return this.files; }
+
+  @Override
+  public String toString() {
+    return "MaxFiles: " + maxFiles();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    ReadMaxFiles other = (ReadMaxFiles) o;
+    return other.maxFiles() == maxFiles();
+  }
+
+  @Override
+  public int hashCode() { return files; }
+}
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadMaxRows.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadMaxRows.java
new file mode 100644
index 0000000..65a68c5
--- /dev/null
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadMaxRows.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read.streaming;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * Represents a {@link ReadLimit} where the {@link MicroBatchStream} should 
scan approximately the
+ * given maximum number of rows.
+ *
+ * @see SupportsAdmissionControl#latestOffset(Offset, ReadLimit)
+ * @since 3.0.0
+ */
+@Evolving
+public final class ReadMaxRows implements ReadLimit {
+  private long rows;
+
+  ReadMaxRows(long rows) {
+    this.rows = rows;
+  }
+
+  /** Approximate maximum rows to scan. */
+  public long maxRows() { return this.rows; }
+
+  @Override
+  public String toString() {
+    return "MaxRows: " + maxRows();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    ReadMaxRows other = (ReadMaxRows) o;
+    return other.maxRows() == maxRows();
+  }
+
+  @Override
+  public int hashCode() { return Long.hashCode(this.rows); }
+}
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsAdmissionControl.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsAdmissionControl.java
new file mode 100644
index 0000000..027763c
--- /dev/null
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsAdmissionControl.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read.streaming;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * A mix-in interface for {@link SparkDataStream} streaming sources to signal 
that they can control
+ * the rate of data ingested into the system. These rate limits can come 
implicitly from the
+ * contract of triggers, e.g. Trigger.Once() requires that a micro-batch 
process all data
+ * available to the system at the start of the micro-batch. Alternatively, 
sources can decide to
+ * limit ingest through data source options.
+ *
+ * Through this interface, a MicroBatchStream should be able to return the 
next offset that it will
+ * process until given a {@link ReadLimit}.
+ *
+ * @since 3.0.0
+ */
+@Evolving
+public interface SupportsAdmissionControl extends SparkDataStream {
+
+  /**
+   * Returns the read limits potentially passed to the data source through 
options when creating
+   * the data source.
+   */
+  default ReadLimit getDefaultReadLimit() { return ReadLimit.allAvailable(); }
+
+  /**
+   * Returns the most recent offset available given a read limit. The start 
offset can be used
+   * to figure out how much new data should be read given the limit. Users 
should implement this
+   * method instead of latestOffset for a MicroBatchStream or getOffset for 
Source.
+   *
+   * When this method is called on a `Source`, the source can return `null` if 
there is no
+   * data to process. In addition, for the very first micro-batch, the 
`startOffset` will be
+   * null as well.
+   *
+   * When this method is called on a MicroBatchStream, the `startOffset` will 
be `initialOffset`
+   * for the very first micro-batch. The source can return `null` if there is 
no data to process.
+   */
+  Offset latestOffset(Offset startOffset, ReadLimit limit);
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 36f7002..e8ce8e1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -30,6 +30,8 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.connector.read.streaming
+import org.apache.spark.sql.connector.read.streaming.{ReadAllAvailable, 
ReadLimit, ReadMaxFiles, SupportsAdmissionControl}
 import org.apache.spark.sql.execution.datasources.{DataSource, 
InMemoryFileIndex, LogicalRelation}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
@@ -45,7 +47,7 @@ class FileStreamSource(
     override val schema: StructType,
     partitionColumns: Seq[String],
     metadataPath: String,
-    options: Map[String, String]) extends Source with Logging {
+    options: Map[String, String]) extends SupportsAdmissionControl with Source 
with Logging {
 
   import FileStreamSource._
 
@@ -115,15 +117,17 @@ class FileStreamSource(
    * `synchronized` on this method is for solving race conditions in tests. In 
the normal usage,
    * there is no race here, so the cost of `synchronized` should be rare.
    */
-  private def fetchMaxOffset(): FileStreamSourceOffset = synchronized {
+  private def fetchMaxOffset(limit: ReadLimit): FileStreamSourceOffset = 
synchronized {
     // All the new files found - ignore aged files and files that we have seen.
     val newFiles = fetchAllFiles().filter {
       case (path, timestamp) => seenFiles.isNewFile(path, timestamp)
     }
 
     // Obey user's setting to limit the number of files in this batch trigger.
-    val batchFiles =
-      if (maxFilesPerBatch.nonEmpty) newFiles.take(maxFilesPerBatch.get) else 
newFiles
+    val batchFiles = limit match {
+      case files: ReadMaxFiles => newFiles.take(files.maxFiles())
+      case _: ReadAllAvailable => newFiles
+    }
 
     batchFiles.foreach { file =>
       seenFiles.add(file._1, file._2)
@@ -150,6 +154,10 @@ class FileStreamSource(
     FileStreamSourceOffset(metadataLogCurrentOffset)
   }
 
+  override def getDefaultReadLimit: ReadLimit = {
+    
maxFilesPerBatch.map(ReadLimit.maxFiles).getOrElse(super.getDefaultReadLimit)
+  }
+
   /**
    * For test only. Run `func` with the internal lock to make sure when `func` 
is running,
    * the current offset won't be changed and no new batch will be emitted.
@@ -269,7 +277,14 @@ class FileStreamSource(
     files
   }
 
-  override def getOffset: Option[Offset] = 
Some(fetchMaxOffset()).filterNot(_.logOffset == -1)
+  override def getOffset: Option[Offset] = {
+    throw new UnsupportedOperationException(
+      "latestOffset(Offset, ReadLimit) should be called instead of this 
method")
+  }
+
+  override def latestOffset(startOffset: streaming.Offset, limit: ReadLimit): 
streaming.Offset = {
+    Some(fetchMaxOffset(limit)).filterNot(_.logOffset == -1).orNull
+  }
 
   override def toString: String = s"FileStreamSource[$qualifiedBasePath]"
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 872c367..83bc347 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -25,10 +25,10 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, 
Attribute, CurrentBatch
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation, 
LogicalPlan, Project}
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, 
Table, TableCapability}
-import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset 
=> OffsetV2, SparkDataStream}
+import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset 
=> OffsetV2, ReadLimit, SparkDataStream, SupportsAdmissionControl}
 import org.apache.spark.sql.execution.SQLExecution
 import 
org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, 
StreamWriterCommitProgress, WriteToDataSourceV2Exec}
-import 
org.apache.spark.sql.execution.streaming.sources.{RateControlMicroBatchStream, 
WriteToMicroBatchDataSource}
+import 
org.apache.spark.sql.execution.streaming.sources.WriteToMicroBatchDataSource
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.{OutputMode, Trigger}
 import org.apache.spark.util.Clock
@@ -79,7 +79,7 @@ class MicroBatchExecution(
 
     import 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
     val _logicalPlan = analyzedPlan.transform {
-      case streamingRelation@StreamingRelation(dataSourceV1, sourceName, 
output) =>
+      case streamingRelation @ StreamingRelation(dataSourceV1, sourceName, 
output) =>
         toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
           // Materialize source to avoid creating it in every batch
           val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
@@ -122,7 +122,18 @@ class MicroBatchExecution(
       // v2 source
       case r: StreamingDataSourceV2Relation => r.stream
     }
-    uniqueSources = sources.distinct
+    uniqueSources = sources.distinct.map {
+      case source: SupportsAdmissionControl =>
+        val limit = source.getDefaultReadLimit
+        if (trigger == OneTimeTrigger && limit != ReadLimit.allAvailable()) {
+          logWarning(s"The read limit $limit for $source is ignored when 
Trigger.Once() is used.")
+          source -> ReadLimit.allAvailable()
+        } else {
+          source -> limit
+        }
+      case other =>
+        other -> ReadLimit.allAvailable()
+    }.toMap
 
     // TODO (SPARK-27484): we should add the writing node before the plan is 
analyzed.
     sink match {
@@ -354,25 +365,33 @@ class MicroBatchExecution(
 
     // Generate a map from each unique source to the next available offset.
     val latestOffsets: Map[SparkDataStream, Option[OffsetV2]] = 
uniqueSources.map {
-      case s: Source =>
+      case (s: SupportsAdmissionControl, limit) =>
         updateStatusMessage(s"Getting offsets from $s")
-        reportTimeTaken("getOffset") {
-          (s, s.getOffset)
+        reportTimeTaken("latestOffset") {
+          val startOffsetOpt = availableOffsets.get(s)
+          val startOffset = s match {
+            case _: Source =>
+              startOffsetOpt.orNull
+            case v2: MicroBatchStream =>
+              startOffsetOpt.map(offset => v2.deserializeOffset(offset.json))
+                .getOrElse(v2.initialOffset())
+          }
+          (s, Option(s.latestOffset(startOffset, limit)))
         }
-      case s: RateControlMicroBatchStream =>
+      case (s: Source, _) =>
         updateStatusMessage(s"Getting offsets from $s")
-        reportTimeTaken("latestOffset") {
-          val startOffset = availableOffsets
-            .get(s).map(off => s.deserializeOffset(off.json))
-            .getOrElse(s.initialOffset())
-          (s, Option(s.latestOffset(startOffset)))
+        reportTimeTaken("getOffset") {
+          (s, s.getOffset)
         }
-      case s: MicroBatchStream =>
+      case (s: MicroBatchStream, _) =>
         updateStatusMessage(s"Getting offsets from $s")
         reportTimeTaken("latestOffset") {
           (s, Option(s.latestOffset()))
         }
-    }.toMap
+      case (s, _) =>
+        // for some reason, the compiler is unhappy and thinks the match is 
not exhaustive
+        throw new IllegalStateException(s"Unexpected source: $s")
+    }
     availableOffsets ++= latestOffsets.filter { case (_, o) => o.nonEmpty 
}.mapValues(_.get)
 
     // Update the query metadata
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index ed908a8..8b3534b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
 import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table}
-import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2, 
SparkDataStream}
+import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2, 
ReadLimit, SparkDataStream}
 import org.apache.spark.sql.connector.write.{LogicalWriteInfoImpl, 
SupportsTruncate}
 import org.apache.spark.sql.connector.write.streaming.StreamingWrite
 import org.apache.spark.sql.execution.QueryExecution
@@ -206,7 +206,7 @@ abstract class StreamExecution(
   /**
    * A list of unique sources in the query plan. This will be set when 
generating logical plan.
    */
-  @volatile protected var uniqueSources: Seq[SparkDataStream] = Seq.empty
+  @volatile protected var uniqueSources: Map[SparkDataStream, ReadLimit] = 
Map.empty
 
   /** Defines the internal state of execution */
   protected val state = new AtomicReference[State](INITIALIZING)
@@ -425,7 +425,7 @@ abstract class StreamExecution(
 
   /** Stops all streaming sources safely. */
   protected def stopSources(): Unit = {
-    uniqueSources.foreach { source =>
+    uniqueSources.foreach { case (source, _) =>
       try {
         source.stop()
       } catch {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index 481552a..a9b724a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.{CurrentDate, 
CurrentTimestamp}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, 
TableCapability}
-import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, Offset 
=> OffsetV2, PartitionOffset}
+import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, Offset 
=> OffsetV2, PartitionOffset, ReadLimit}
 import org.apache.spark.sql.execution.SQLExecution
 import 
org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
 import org.apache.spark.sql.execution.streaming.{StreamingRelationV2, _}
@@ -84,7 +84,7 @@ class ContinuousExecution(
     sources = _logicalPlan.collect {
       case r: StreamingDataSourceV2Relation => 
r.stream.asInstanceOf[ContinuousStream]
     }
-    uniqueSources = sources.distinct
+    uniqueSources = sources.distinct.map(s => s -> 
ReadLimit.allAvailable()).toMap
 
     // TODO (SPARK-27484): we should add the writing node before the plan is 
analyzed.
     WriteToContinuousDataSource(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 632e007..fa32033 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -1174,6 +1174,62 @@ class FileStreamSourceSuite extends FileStreamSourceTest 
{
     }
   }
 
+  test("SPARK-30669: maxFilesPerTrigger - ignored when using Trigger.Once") {
+    withTempDirs { (src, target) =>
+      val checkpoint = new File(target, "chk").getCanonicalPath
+      val targetDir = new File(target, "data").getCanonicalPath
+      var lastFileModTime: Option[Long] = None
+
+      /** Create a text file with a single data item */
+      def createFile(data: Int): File = {
+        val file = stringToFile(new File(src, s"$data.txt"), data.toString)
+        if (lastFileModTime.nonEmpty) file.setLastModified(lastFileModTime.get 
+ 1000)
+        lastFileModTime = Some(file.lastModified)
+        file
+      }
+
+      createFile(1)
+      createFile(2)
+      createFile(3)
+
+      // Set up a query to read text files one at a time
+      val df = spark
+        .readStream
+        .option("maxFilesPerTrigger", 1)
+        .text(src.getCanonicalPath)
+
+      def startQuery(): StreamingQuery = {
+        df.writeStream
+          .format("parquet")
+          .trigger(Trigger.Once)
+          .option("checkpointLocation", checkpoint)
+          .start(targetDir)
+      }
+      val q = startQuery()
+
+      try {
+        assert(q.awaitTermination(streamingTimeout.toMillis))
+        assert(q.recentProgress.count(_.numInputRows != 0) == 1) // only one 
trigger was run
+        checkAnswer(sql(s"SELECT * from parquet.`$targetDir`"), (1 to 
3).map(_.toString).toDF)
+      } finally {
+        q.stop()
+      }
+
+      createFile(4)
+      createFile(5)
+
+      // run a second batch
+      val q2 = startQuery()
+      try {
+        assert(q2.awaitTermination(streamingTimeout.toMillis))
+        assert(q2.recentProgress.count(_.numInputRows != 0) == 1) // only one 
trigger was run
+        checkAnswer(sql(s"SELECT * from parquet.`$targetDir`"), (1 to 
5).map(_.toString).toDF)
+      } finally {
+        q2.stop()
+      }
+    }
+  }
+
   test("explain") {
     withTempDirs { case (src, tmp) =>
       src.mkdirs()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to