This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 122a88c4941 [SPARK-42968][SS] Add option to skip commit coordinator as part of StreamingWrite API for DSv2 sources/sinks 122a88c4941 is described below commit 122a88c4941d8ce1b8344c425fed455b79298afa Author: Anish Shrigondekar <anish.shrigonde...@databricks.com> AuthorDate: Thu Mar 30 21:48:47 2023 +0900 [SPARK-42968][SS] Add option to skip commit coordinator as part of StreamingWrite API for DSv2 sources/sinks ### What changes were proposed in this pull request? Add option to skip commit coordinator as part of StreamingWrite API for DSv2 sources/sinks. This option was already present as part of the BatchWrite API ### Why are the changes needed? Sinks such as the following are atleast-once for which we do not need to go through the commit coordinator on the driver to ensure that a single partition commits. This is even less useful for streaming use-cases where batches could be replayed from the checkpoint dir. - memory sink - console sink - no-op sink - Kafka v2 sink ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit test for the change ``` [info] ReportSinkMetricsSuite: 22:23:01.276 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 22:23:03.139 WARN org.apache.spark.sql.execution.streaming.ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled. [info] - test ReportSinkMetrics with useCommitCoordinator=true (2 seconds, 709 milliseconds) 22:23:04.522 WARN org.apache.spark.sql.execution.streaming.ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled. [info] - test ReportSinkMetrics with useCommitCoordinator=false (373 milliseconds) 22:23:04.941 WARN org.apache.spark.sql.streaming.ReportSinkMetricsSuite: ===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.streaming.ReportSinkMetricsSuite, threads: ForkJoinPool.commonPool-worker-19 (daemon=true), rpc-boss-3-1 (daemon=true), shuffle-boss-6-1 (daemon=true) ===== [info] Run completed in 4 seconds, 934 milliseconds. [info] Total number of tests run: 2 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 2, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ``` Closes #40600 from anishshri-db/task/SPARK-42968. Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../spark/sql/kafka010/KafkaStreamingWrite.scala | 2 + .../connector/write/streaming/StreamingWrite.java | 10 +++ .../datasources/noop/NoopDataSource.scala | 1 + .../streaming/sources/ConsoleStreamingWrite.scala | 2 + .../streaming/sources/MicroBatchWrite.scala | 4 + .../sql/execution/streaming/sources/memory.scala | 2 + .../sql/streaming/ReportSinkMetricsSuite.scala | 85 ++++++++++++---------- 7 files changed, 67 insertions(+), 39 deletions(-) diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala index bcf9e3416f8..db719966267 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala @@ -45,6 +45,8 @@ private[kafka010] class KafkaStreamingWrite( info: PhysicalWriteInfo): KafkaStreamWriterFactory = KafkaStreamWriterFactory(topic, producerParams, schema) + override def useCommitCoordinator(): Boolean = false + override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/streaming/StreamingWrite.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/streaming/StreamingWrite.java index 20694f0b051..ab98bc01b3a 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/streaming/StreamingWrite.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/streaming/StreamingWrite.java @@ -58,6 +58,16 @@ public interface StreamingWrite { */ StreamingDataWriterFactory createStreamingWriterFactory(PhysicalWriteInfo info); + /** + * Returns whether Spark should use the commit coordinator to ensure that at most one task for + * each partition commits. + * + * @return true if commit coordinator should be used, false otherwise. + */ + default boolean useCommitCoordinator() { + return true; + } + /** * Commits this writing job for the specified epoch with a list of commit messages. The commit * messages are collected from successful data writers and are produced by diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala index 1455a5516d0..a662ea3b8d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala @@ -83,6 +83,7 @@ private[noop] object NoopWriter extends DataWriter[InternalRow] { private[noop] object NoopStreamingWrite extends StreamingWrite { override def createStreamingWriterFactory( info: PhysicalWriteInfo): StreamingDataWriterFactory = NoopStreamingDataWriterFactory + override def useCommitCoordinator(): Boolean = false override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleStreamingWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleStreamingWrite.scala index dc25289aa1e..5cb11b9280c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleStreamingWrite.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleStreamingWrite.scala @@ -41,6 +41,8 @@ class ConsoleWrite(schema: StructType, options: CaseInsensitiveStringMap) def createStreamingWriterFactory(info: PhysicalWriteInfo): StreamingDataWriterFactory = PackedRowWriterFactory + override def useCommitCoordinator(): Boolean = false + override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = { // We have to print a "Batch" label for the epoch for compatibility with the pre-data source V2 // behavior. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala index 3f474ea533c..e610a8dd127 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala @@ -31,6 +31,10 @@ class MicroBatchWrite(epochId: Long, val writeSupport: StreamingWrite) extends B s"MicroBatchWrite[epoch: $epochId, writer: $writeSupport]" } + override def useCommitCoordinator(): Boolean = { + writeSupport.useCommitCoordinator() + } + override def commit(messages: Array[WriterCommitMessage]): Unit = { writeSupport.commit(epochId, messages) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala index 7037bcf05c1..27a39bccfdd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala @@ -143,6 +143,8 @@ class MemoryStreamingWrite( MemoryWriterFactory(schema) } + override def useCommitCoordinator(): Boolean = false + override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = { val newRows = messages.flatMap { case message: MemoryWriterCommitMessage => message.data diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ReportSinkMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ReportSinkMetricsSuite.scala index c5bd389eeb9..7407ee71618 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ReportSinkMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ReportSinkMetricsSuite.scala @@ -36,53 +36,56 @@ class ReportSinkMetricsSuite extends StreamTest { import testImplicits._ - test("test ReportSinkMetrics") { - val inputData = MemoryStream[Int] - val df = inputData.toDF() - var query: StreamingQuery = null + Seq("true", "false").foreach { useCommitCoordinator => + test(s"test ReportSinkMetrics with useCommitCoordinator=$useCommitCoordinator") { + val inputData = MemoryStream[Int] + val df = inputData.toDF() + var query: StreamingQuery = null - var metricsMap: java.util.Map[String, String] = null + var metricsMap: java.util.Map[String, String] = null - val listener = new StreamingQueryListener { + val listener = new StreamingQueryListener { - override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {} + override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {} - override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { - metricsMap = event.progress.sink.metrics + override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { + metricsMap = event.progress.sink.metrics + } + + override def onQueryTerminated( + event: StreamingQueryListener.QueryTerminatedEvent): Unit = {} } - override def onQueryTerminated( - event: StreamingQueryListener.QueryTerminatedEvent): Unit = {} - } + spark.streams.addListener(listener) - spark.streams.addListener(listener) + withTempDir { dir => + try { + query = + df.writeStream + .outputMode("append") + .format("org.apache.spark.sql.streaming.TestSinkProvider") + .option("useCommitCoordinator", useCommitCoordinator) + .option("checkPointLocation", dir.toString) + .start() - withTempDir { dir => - try { - query = - df.writeStream - .outputMode("append") - .format("org.apache.spark.sql.streaming.TestSinkProvider") - .option("checkPointLocation", dir.toString) - .start() + inputData.addData(1, 2, 3) - inputData.addData(1, 2, 3) + failAfter(streamingTimeout) { + query.processAllAvailable() + } - failAfter(streamingTimeout) { - query.processAllAvailable() - } + spark.sparkContext.listenerBus.waitUntilEmpty() - spark.sparkContext.listenerBus.waitUntilEmpty() + assertResult(metricsMap) { + Map("metrics-1" -> "value-1", "metrics-2" -> "value-2").asJava + } + } finally { + if (query != null) { + query.stop() + } - assertResult(metricsMap) { - Map("metrics-1" -> "value-1", "metrics-2" -> "value-2").asJava - } - } finally { - if (query != null) { - query.stop() + spark.streams.removeListener(listener) } - - spark.streams.removeListener(listener) } } } @@ -98,7 +101,8 @@ class ReportSinkMetricsSuite extends StreamTest { with CreatableRelationProvider with Logging { override def getTable(options: CaseInsensitiveStringMap): Table = { - TestSinkTable + val useCommitCoordinator = options.getBoolean("useCommitCoordinator", false) + new TestSinkTable(useCommitCoordinator) } def createRelation( @@ -113,7 +117,8 @@ class ReportSinkMetricsSuite extends StreamTest { def shortName(): String = "test" } - object TestSinkTable extends Table with SupportsWrite with ReportsSinkMetrics with Logging { + class TestSinkTable(useCommitCoordinator: Boolean) + extends Table with SupportsWrite with ReportsSinkMetrics with Logging { override def name(): String = "test" @@ -131,7 +136,7 @@ class ReportSinkMetricsSuite extends StreamTest { override def build(): Write = { new Write { override def toStreaming: StreamingWrite = { - new TestSinkWrite() + new TestSinkWrite(useCommitCoordinator) } } } @@ -143,13 +148,15 @@ class ReportSinkMetricsSuite extends StreamTest { } } - class TestSinkWrite() + class TestSinkWrite(useCommitCoordinator: Boolean) extends StreamingWrite with Logging with Serializable { def createStreamingWriterFactory(info: PhysicalWriteInfo): StreamingDataWriterFactory = PackedRowWriterFactory + override def useCommitCoordinator(): Boolean = useCommitCoordinator + override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} -} + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org