This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 227e26202522 [SPARK-45178][SS] Fallback to execute a single batch for
Trigger.AvailableNow with unsupported sources rather than using wrapper
227e26202522 is described below
commit 227e262025229a67f43a8de452215053a9cbf662
Author: Jungtaek Lim <[email protected]>
AuthorDate: Wed Sep 20 11:05:06 2023 +0900
[SPARK-45178][SS] Fallback to execute a single batch for
Trigger.AvailableNow with unsupported sources rather than using wrapper
### What changes were proposed in this pull request?
This PR proposes to change the behavior when user runs streaming query with
Trigger.AvailableNow, which query has any source which does not support
Trigger.AvailableNow. Instead of using wrapper implementation, this PR proposes
to fall back to execute a single batch (a.k.a Trigger.Once).
This PR introduces a new flag
`spark.sql.streaming.triggerAvailableNowWrapper.enabled` to retain the behavior
for advanced and extreme users. The flag is marked as internal since it's
really only for extreme users who are concerned about behavioral change.
Minor details would be following:
* This PR does not use Trigger.Once, hence users won't see deprecation
warning for Trigger.Once.
* This PR will provide a warning log to inform the source(s) which doesn't
support Trigger.AvailableNow, so that users can indicate which source(s) is/are
preventing them to enjoy benefits of Trigger.AvailableNow.
### Why are the changes needed?
We have observed a data duplication issue with 3rd party data source when
it's used with Trigger.AvailableNow. The source didn't support
Trigger.AvailableNow, and unfortunately is also not played well with wrapper
implementation.
We care more about possible correctness issue than better coverage of
Trigger.AvailableNow, hence want to stop using wrapper implementation by
default. We also care about not breaking existing query, so fallback to single
batch execution rather than failing the query.
### Does this PR introduce _any_ user-facing change?
Yes, this introduces a behavioral change for streaming query with
Trigger.AvailableNow which contains any source not supporting
Trigger.AvailableNow.
### How was this patch tested?
Modified UT.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #42940 from HeartSaVioR/SPARK-45178.
Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
docs/ss-migration-guide.md | 4 ++
docs/structured-streaming-programming-guide.md | 2 +
.../org/apache/spark/sql/internal/SQLConf.scala | 11 ++++
.../AsyncProgressTrackingMicroBatchExecution.scala | 7 +-
.../streaming/AvailableNowDataStreamWrapper.scala | 6 ++
.../execution/streaming/MicroBatchExecution.scala | 50 +++++++++++++--
.../spark/sql/execution/streaming/memory.scala | 19 +++++-
.../streaming/StreamingQueryListenerSuite.scala | 6 +-
.../spark/sql/streaming/StreamingQuerySuite.scala | 6 +-
.../sql/streaming/TriggerAvailableNowSuite.scala | 74 +++++++++++++++++++++-
10 files changed, 167 insertions(+), 18 deletions(-)
diff --git a/docs/ss-migration-guide.md b/docs/ss-migration-guide.md
index 57fe3a84e12c..3247866206ee 100644
--- a/docs/ss-migration-guide.md
+++ b/docs/ss-migration-guide.md
@@ -26,6 +26,10 @@ Note that this migration guide describes the items specific
to Structured Stream
Many items of SQL migration can be applied when migrating Structured Streaming
to higher versions.
Please refer [Migration Guide: SQL, Datasets and
DataFrame](sql-migration-guide.html).
+## Upgrading from Structured Streaming 3.5 to 4.0
+
+- Since Spark 4.0, Spark falls back to single batch execution if any source in
the query does not support `Trigger.AvailableNow`. This is to avoid any
possible correctness, duplication, and dataloss issue due to incompatibility
between source and wrapper implementation. (See
[SPARK-45178](https://issues.apache.org/jira/browse/SPARK-45178) for more
details.)
+
## Upgrading from Structured Streaming 3.3 to 3.4
- Since Spark 3.4, `Trigger.Once` is deprecated, and users are encouraged to
migrate from `Trigger.Once` to `Trigger.AvailableNow`. Please refer
[SPARK-39805](https://issues.apache.org/jira/browse/SPARK-39805) for more
details.
diff --git a/docs/structured-streaming-programming-guide.md
b/docs/structured-streaming-programming-guide.md
index b84877d67c01..70e763be0d70 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -3265,6 +3265,8 @@ Here are the different kinds of triggers that are
supported.
if the last batch advances the watermark. This helps to
maintain smaller and predictable
state size and smaller latency on the output of stateful
operators.</li>
</ul>
+ NOTE: this trigger will be deactivated when there is any source which
does not support Trigger.AvailableNow.
+ Spark will perform one-time micro-batch as a fall-back. Check the
above differences for a risk of fallback.
</td>
</tr>
<tr>
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 49a4b0bf98bb..cce85da29b62 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2180,6 +2180,17 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val STREAMING_TRIGGER_AVAILABLE_NOW_WRAPPER_ENABLED =
+ buildConf("spark.sql.streaming.triggerAvailableNowWrapper.enabled")
+ .internal()
+ .doc("Whether to use the wrapper implementation of Trigger.AvailableNow
if the source " +
+ "does not support Trigger.AvailableNow. Enabling this allows the
benefits of " +
+ "Trigger.AvailableNow with sources which don't support it, but some
sources " +
+ "may show unexpected behavior including duplication, data loss, etc.
So use with " +
+ "extreme care! The ideal direction is to persuade developers of
source(s) to " +
+ "support Trigger.AvailableNow.")
+ .booleanConf
+ .createWithDefault(false)
val VARIABLE_SUBSTITUTE_ENABLED =
buildConf("spark.sql.variable.substitute")
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala
index 56cdba881753..206efb9a5450 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala
@@ -50,8 +50,6 @@ class AsyncProgressTrackingMicroBatchExecution(
// to cache the batch id of the last batch written to storage
private val lastBatchPersistedToDurableStorage = new AtomicLong(-1)
- override val triggerExecutor: TriggerExecutor = validateAndGetTrigger()
-
// used to check during the first batch if the pipeline is stateful
private var isFirstBatch: Boolean = true
@@ -94,6 +92,9 @@ class AsyncProgressTrackingMicroBatchExecution(
override val commitLog =
new AsyncCommitLog(sparkSession, checkpointFile("commits"),
asyncWritesExecutorService)
+ // perform quick validation to fail faster
+ validateAndGetTrigger()
+
override def validateOffsetLogAndGetPrevOffset(latestBatchId: Long):
Option[OffsetSeq] = {
/* Initialize committed offsets to a committed batch, which at this
* is the second latest batch id in the offset log.
@@ -228,6 +229,8 @@ class AsyncProgressTrackingMicroBatchExecution(
asyncWritesExecutorService.getQueue.size() > 0 ||
asyncWritesExecutorService.getActiveCount > 0
}
+ override protected def getTrigger(): TriggerExecutor =
validateAndGetTrigger()
+
private def validateAndGetTrigger(): TriggerExecutor = {
// validate that the pipeline is using a supported sink
if (!extraOptions
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowDataStreamWrapper.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowDataStreamWrapper.scala
index 0dc510476279..18dd2eba083a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowDataStreamWrapper.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowDataStreamWrapper.scala
@@ -28,6 +28,12 @@ import org.apache.spark.sql.connector.read.streaming
class AvailableNowDataStreamWrapper(val delegate: SparkDataStream)
extends SparkDataStream with SupportsTriggerAvailableNow with Logging {
+ // See SPARK-45178 for more details.
+ logWarning("Activating the wrapper implementation of Trigger.AvailableNow
for source " +
+ s"[$delegate]. Note that this might introduce possibility of
deduplication, dataloss, " +
+ "correctness issue. Enable the config with extreme care. We strongly
recommend to contact " +
+ "the data source developer to support Trigger.AvailableNow.")
+
private var fetchedOffset: streaming.Offset = _
override def initialOffset(): streaming.Offset = delegate.initialOffset()
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 010ac75a73da..8edbfea3eb2c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -52,11 +52,46 @@ class MicroBatchExecution(
@volatile protected var sources: Seq[SparkDataStream] = Seq.empty
- protected val triggerExecutor: TriggerExecutor = trigger match {
- case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock)
- case OneTimeTrigger => SingleBatchExecutor()
- case AvailableNowTrigger => MultiBatchExecutor()
- case _ => throw new IllegalStateException(s"Unknown type of trigger:
$trigger")
+ @volatile protected[sql] var triggerExecutor: TriggerExecutor = _
+
+ protected def getTrigger(): TriggerExecutor = {
+ assert(sources.nonEmpty, "sources should have been retrieved from the
plan!")
+ trigger match {
+ case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock)
+ case OneTimeTrigger => SingleBatchExecutor()
+ case AvailableNowTrigger =>
+ // When the flag is enabled, Spark will wrap sources which do not
support
+ // Trigger.AvailableNow with wrapper implementation, so that
Trigger.AvailableNow can
+ // take effect.
+ // When the flag is disabled, Spark will fall back to single batch
execution, whenever
+ // it figures out any source does not support Trigger.AvailableNow.
+ // See SPARK-45178 for more details.
+ if (sparkSession.sqlContext.conf.getConf(
+ SQLConf.STREAMING_TRIGGER_AVAILABLE_NOW_WRAPPER_ENABLED)) {
+ logInfo("Configured to use the wrapper of Trigger.AvailableNow for
query " +
+ s"$prettyIdString.")
+ MultiBatchExecutor()
+ } else {
+ val supportsTriggerAvailableNow = sources.distinct.forall { src =>
+ val supports = src.isInstanceOf[SupportsTriggerAvailableNow]
+ if (!supports) {
+ logWarning(s"source [$src] does not support
Trigger.AvailableNow. Falling back to " +
+ "single batch execution. Note that this may not guarantee
processing new data if " +
+ "there is an uncommitted batch. Please consult with data
source developer to " +
+ "support Trigger.AvailableNow.")
+ }
+
+ supports
+ }
+
+ if (supportsTriggerAvailableNow) {
+ MultiBatchExecutor()
+ } else {
+ SingleBatchExecutor()
+ }
+ }
+ case _ => throw new IllegalStateException(s"Unknown type of trigger:
$trigger")
+ }
}
protected var watermarkTracker: WatermarkTracker = _
@@ -130,6 +165,11 @@ class MicroBatchExecution(
// v2 source
case r: StreamingDataSourceV2Relation => r.stream
}
+
+ // Initializing TriggerExecutor relies on `sources`, hence calling this
after initializing
+ // sources.
+ triggerExecutor = getTrigger()
+
uniqueSources = triggerExecutor match {
case _: SingleBatchExecutor =>
sources.distinct.map {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index 732eaa8d783d..fa0744dc19b2 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -34,7 +34,7 @@ import
org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.connector.catalog.{SupportsRead, Table,
TableCapability}
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader,
PartitionReaderFactory, Scan, ScanBuilder}
-import org.apache.spark.sql.connector.read.streaming.{ContinuousStream,
MicroBatchStream, Offset => OffsetV2, SparkDataStream}
+import org.apache.spark.sql.connector.read.streaming.{ContinuousStream,
MicroBatchStream, Offset => OffsetV2, ReadLimit, SparkDataStream,
SupportsTriggerAvailableNow}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.connector.SimpleTableProvider
import org.apache.spark.sql.types.StructType
@@ -155,7 +155,10 @@ case class MemoryStream[A : Encoder](
id: Int,
sqlContext: SQLContext,
numPartitions: Option[Int] = None)
- extends MemoryStreamBase[A](sqlContext) with MicroBatchStream with Logging {
+ extends MemoryStreamBase[A](sqlContext)
+ with MicroBatchStream
+ with SupportsTriggerAvailableNow
+ with Logging {
protected val output = logicalPlan.output
@@ -175,6 +178,9 @@ case class MemoryStream[A : Encoder](
@GuardedBy("this")
private var endOffset = new LongOffset(-1)
+ @GuardedBy("this")
+ private var availableNowEndOffset: OffsetV2 = _
+
/**
* Last offset that was discarded, or -1 if no commits have occurred. Note
that the value
* -1 is used in calculations below and isn't just an arbitrary constant.
@@ -201,7 +207,15 @@ case class MemoryStream[A : Encoder](
override def initialOffset: OffsetV2 = LongOffset(-1)
+ override def prepareForTriggerAvailableNow(): Unit = synchronized {
+ availableNowEndOffset = latestOffset(initialOffset,
ReadLimit.allAvailable())
+ }
+
override def latestOffset(): OffsetV2 = {
+ throw new IllegalStateException("Should not reach here!")
+ }
+
+ override def latestOffset(startOffset: OffsetV2, limit: ReadLimit): OffsetV2
= {
if (currentOffset.offset == -1) null else currentOffset
}
@@ -277,6 +291,7 @@ case class MemoryStream[A : Encoder](
endOffset = LongOffset(-1)
currentOffset = new LongOffset(-1)
lastOffsetCommitted = new LongOffset(-1)
+ availableNowEndOffset = null
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 52b740bc5c34..861e4e83ceff 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -29,7 +29,7 @@ import org.scalatest.concurrent.Waiters.Waiter
import org.apache.spark.SparkException
import org.apache.spark.scheduler._
import org.apache.spark.sql.{Encoder, Row, SparkSession}
-import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2}
+import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2,
ReadLimit}
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.StreamingQueryListener._
@@ -314,9 +314,9 @@ class StreamingQueryListenerSuite extends StreamTest with
BeforeAndAfter {
try {
var numTriggers = 0
val input = new MemoryStream[Int](0, sqlContext) {
- override def latestOffset(): OffsetV2 = {
+ override def latestOffset(startOffset: OffsetV2, limit: ReadLimit):
OffsetV2 = {
numTriggers += 1
- super.latestOffset()
+ super.latestOffset(startOffset, limit)
}
}
val clock = new StreamManualClock()
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index c3729d50ed09..9444db3e10fa 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -43,7 +43,7 @@ import
org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Complete
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.connector.read.InputPartition
-import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2}
+import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2,
ReadLimit}
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.sources.{MemorySink,
TestForeachWriter}
@@ -230,9 +230,9 @@ class StreamingQuerySuite extends StreamTest with
BeforeAndAfter with Logging wi
private def dataAdded: Boolean = currentOffset.offset != -1
// latestOffset should take 50 ms the first time it is called after data
is added
- override def latestOffset(): OffsetV2 = synchronized {
+ override def latestOffset(startOffset: OffsetV2, limit: ReadLimit):
OffsetV2 = synchronized {
if (dataAdded) clock.waitTillTime(1050)
- super.latestOffset()
+ super.latestOffset(startOffset, limit)
}
// getBatch should take 100 ms the first time it is called
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TriggerAvailableNowSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TriggerAvailableNowSuite.scala
index 65deca222073..defd5fd110de 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TriggerAvailableNowSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TriggerAvailableNowSuite.scala
@@ -24,7 +24,8 @@ import org.apache.spark.sql.catalyst.plans.logical.Range
import org.apache.spark.sql.catalyst.util.stringToFile
import org.apache.spark.sql.connector.read.streaming
import org.apache.spark.sql.connector.read.streaming.{ReadLimit,
SupportsAdmissionControl}
-import org.apache.spark.sql.execution.streaming.{LongOffset, MemoryStream,
Offset, SerializedOffset, Source, StreamingExecutionRelation}
+import org.apache.spark.sql.execution.streaming.{LongOffset, MemoryStream,
MicroBatchExecution, MultiBatchExecutor, Offset, SerializedOffset,
SingleBatchExecutor, Source, StreamingExecutionRelation, StreamingQueryWrapper}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{LongType, StructType}
import org.apache.spark.tags.SlowSQLTest
@@ -41,6 +42,10 @@ class TriggerAvailableNowSuite extends FileStreamSourceTest {
def incrementAvailableOffset(numNewRows: Int): Unit
def sourceName: String
+
+ def reset(): Unit = {
+ currentOffset = 0L
+ }
}
class TestSource extends TestDataFrameProvider with Source {
@@ -103,6 +108,22 @@ class TriggerAvailableNowSuite extends
FileStreamSourceTest {
// remove the trailing `$` in the class name
override def sourceName: String =
MemoryStream.getClass.getSimpleName.dropRight(1)
+
+ override def reset(): Unit = {
+ super.reset()
+ memoryStream.reset()
+ }
+ }
+
+ def testWithConfigMatrix(testName: String)(testFun: => Any): Unit = {
+ Seq(true, false).foreach { useWrapper =>
+ test(testName + s" (using wrapper: $useWrapper)") {
+ withSQLConf(
+ SQLConf.STREAMING_TRIGGER_AVAILABLE_NOW_WRAPPER_ENABLED.key ->
useWrapper.toString) {
+ testFun
+ }
+ }
+ }
}
Seq(
@@ -110,7 +131,9 @@ class TriggerAvailableNowSuite extends FileStreamSourceTest
{
new TestSourceWithAdmissionControl,
new TestMicroBatchStream
).foreach { testSource =>
- test(s"TriggerAvailableNow for multiple sources with
${testSource.getClass}") {
+ testWithConfigMatrix(s"TriggerAvailableNow for multiple sources with
${testSource.getClass}") {
+ testSource.reset()
+
withTempDirs { (src, target) =>
val checkpoint = new File(target, "chk").getCanonicalPath
val targetDir = new File(target, "data").getCanonicalPath
@@ -154,6 +177,7 @@ class TriggerAvailableNowSuite extends FileStreamSourceTest
{
q.recentProgress.foreach { p =>
assert(p.sources.exists(_.description.startsWith(testSource.sourceName)))
}
+ assertQueryUsingRightBatchExecutor(testSource, q)
checkAnswer(sql(s"SELECT * from parquet.`$targetDir`"),
Seq(1, 2, 3, 7, 8, 9).map(_.toString).toDF())
} finally {
@@ -174,6 +198,7 @@ class TriggerAvailableNowSuite extends FileStreamSourceTest
{
q2.recentProgress.foreach { p =>
assert(p.sources.exists(_.description.startsWith(testSource.sourceName)))
}
+ assertQueryUsingRightBatchExecutor(testSource, q2)
checkAnswer(sql(s"SELECT * from parquet.`$targetDir`"), (1 to
12).map(_.toString).toDF())
} finally {
q2.stop()
@@ -187,7 +212,9 @@ class TriggerAvailableNowSuite extends FileStreamSourceTest
{
new TestSourceWithAdmissionControl,
new TestMicroBatchStream
).foreach { testSource =>
- test(s"TriggerAvailableNow for single source with ${testSource.getClass}")
{
+ testWithConfigMatrix(s"TriggerAvailableNow for single source with
${testSource.getClass}") {
+ testSource.reset()
+
val tableName = "trigger_available_now_test_table"
withTable(tableName) {
val df = testSource.toDF
@@ -210,6 +237,7 @@ class TriggerAvailableNowSuite extends FileStreamSourceTest
{
q.recentProgress.foreach { p =>
assert(p.sources.exists(_.description.startsWith(testSource.sourceName)))
}
+ assertQueryUsingRightBatchExecutor(testSource, q)
checkAnswer(spark.table(tableName), (1 to 3).toDF())
} finally {
q.stop()
@@ -225,6 +253,7 @@ class TriggerAvailableNowSuite extends FileStreamSourceTest
{
q2.recentProgress.foreach { p =>
assert(p.sources.exists(_.description.startsWith(testSource.sourceName)))
}
+ assertQueryUsingRightBatchExecutor(testSource, q2)
checkAnswer(spark.table(tableName), (1 to 6).toDF())
} finally {
q2.stop()
@@ -232,4 +261,43 @@ class TriggerAvailableNowSuite extends
FileStreamSourceTest {
}
}
}
+
+ private def assertQueryUsingRightBatchExecutor(
+ testSource: TestDataFrameProvider,
+ query: StreamingQuery): Unit = {
+ val useWrapper = query.sparkSession.conf.get(
+ SQLConf.STREAMING_TRIGGER_AVAILABLE_NOW_WRAPPER_ENABLED)
+
+ if (useWrapper) {
+ assertQueryUsingMultiBatchExecutor(query)
+ } else {
+ testSource match {
+ case _: TestMicroBatchStream =>
+ // Trigger.AvailableNow should take effect because all sources
support
+ // Trigger.AvailableNow.
+ assertQueryUsingMultiBatchExecutor(query)
+
+ case _ =>
+ // We fall back to single batch executor because there is a source
which doesn't
+ // support Trigger.AvailableNow.
+ assertQueryUsingSingleBatchExecutor(query)
+ }
+ }
+ }
+
+ private def assertQueryUsingSingleBatchExecutor(query: StreamingQuery): Unit
= {
+
assert(getMicroBatchExecution(query).triggerExecutor.isInstanceOf[SingleBatchExecutor])
+ }
+
+ private def assertQueryUsingMultiBatchExecutor(query: StreamingQuery): Unit
= {
+
assert(getMicroBatchExecution(query).triggerExecutor.isInstanceOf[MultiBatchExecutor])
+ }
+
+ private def getMicroBatchExecution(query: StreamingQuery):
MicroBatchExecution = {
+ if (query.isInstanceOf[StreamingQueryWrapper]) {
+
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.asInstanceOf[MicroBatchExecution]
+ } else {
+ query.asInstanceOf[MicroBatchExecution]
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]