This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 122a88c4941 [SPARK-42968][SS] Add option to skip commit coordinator as 
part of StreamingWrite API for DSv2 sources/sinks
122a88c4941 is described below

commit 122a88c4941d8ce1b8344c425fed455b79298afa
Author: Anish Shrigondekar <anish.shrigonde...@databricks.com>
AuthorDate: Thu Mar 30 21:48:47 2023 +0900

    [SPARK-42968][SS] Add option to skip commit coordinator as part of 
StreamingWrite API for DSv2 sources/sinks
    
    ### What changes were proposed in this pull request?
    Add option to skip commit coordinator as part of StreamingWrite API for 
DSv2 sources/sinks. This option was already present as part of the BatchWrite 
API
    
    ### Why are the changes needed?
    Sinks such as the following are atleast-once for which we do not need to go 
through the commit coordinator on the driver to ensure that a single partition 
commits. This is even less useful for streaming use-cases where batches could 
be replayed from the checkpoint dir.
    
    - memory sink
    - console sink
    - no-op sink
    - Kafka v2 sink
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added unit test for the change
    ```
    [info] ReportSinkMetricsSuite:
    22:23:01.276 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load 
native-hadoop library for your platform... using builtin-java classes where 
applicable
    22:23:03.139 WARN 
org.apache.spark.sql.execution.streaming.ResolveWriteToStream: 
spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets 
and will be disabled.
    [info] - test ReportSinkMetrics with useCommitCoordinator=true (2 seconds, 
709 milliseconds)
    22:23:04.522 WARN 
org.apache.spark.sql.execution.streaming.ResolveWriteToStream: 
spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets 
and will be disabled.
    [info] - test ReportSinkMetrics with useCommitCoordinator=false (373 
milliseconds)
    22:23:04.941 WARN org.apache.spark.sql.streaming.ReportSinkMetricsSuite:
    
    ===== POSSIBLE THREAD LEAK IN SUITE 
o.a.s.sql.streaming.ReportSinkMetricsSuite, threads: 
ForkJoinPool.commonPool-worker-19 (daemon=true), rpc-boss-3-1 (daemon=true), 
shuffle-boss-6-1 (daemon=true) =====
    [info] Run completed in 4 seconds, 934 milliseconds.
    [info] Total number of tests run: 2
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 2, failed 0, canceled 0, ignored 0, pending 0
    [info] All tests passed.
    ```
    
    Closes #40600 from anishshri-db/task/SPARK-42968.
    
    Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../spark/sql/kafka010/KafkaStreamingWrite.scala   |  2 +
 .../connector/write/streaming/StreamingWrite.java  | 10 +++
 .../datasources/noop/NoopDataSource.scala          |  1 +
 .../streaming/sources/ConsoleStreamingWrite.scala  |  2 +
 .../streaming/sources/MicroBatchWrite.scala        |  4 +
 .../sql/execution/streaming/sources/memory.scala   |  2 +
 .../sql/streaming/ReportSinkMetricsSuite.scala     | 85 ++++++++++++----------
 7 files changed, 67 insertions(+), 39 deletions(-)

diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala
index bcf9e3416f8..db719966267 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala
@@ -45,6 +45,8 @@ private[kafka010] class KafkaStreamingWrite(
       info: PhysicalWriteInfo): KafkaStreamWriterFactory =
     KafkaStreamWriterFactory(topic, producerParams, schema)
 
+  override def useCommitCoordinator(): Boolean = false
+
   override def commit(epochId: Long, messages: Array[WriterCommitMessage]): 
Unit = {}
   override def abort(epochId: Long, messages: Array[WriterCommitMessage]): 
Unit = {}
 }
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/streaming/StreamingWrite.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/streaming/StreamingWrite.java
index 20694f0b051..ab98bc01b3a 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/streaming/StreamingWrite.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/streaming/StreamingWrite.java
@@ -58,6 +58,16 @@ public interface StreamingWrite {
    */
   StreamingDataWriterFactory createStreamingWriterFactory(PhysicalWriteInfo 
info);
 
+  /**
+   * Returns whether Spark should use the commit coordinator to ensure that at 
most one task for
+   * each partition commits.
+   *
+   * @return true if commit coordinator should be used, false otherwise.
+   */
+  default boolean useCommitCoordinator() {
+    return true;
+  }
+
   /**
    * Commits this writing job for the specified epoch with a list of commit 
messages. The commit
    * messages are collected from successful data writers and are produced by
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
index 1455a5516d0..a662ea3b8d2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
@@ -83,6 +83,7 @@ private[noop] object NoopWriter extends 
DataWriter[InternalRow] {
 private[noop] object NoopStreamingWrite extends StreamingWrite {
   override def createStreamingWriterFactory(
       info: PhysicalWriteInfo): StreamingDataWriterFactory = 
NoopStreamingDataWriterFactory
+  override def useCommitCoordinator(): Boolean = false
   override def commit(epochId: Long, messages: Array[WriterCommitMessage]): 
Unit = {}
   override def abort(epochId: Long, messages: Array[WriterCommitMessage]): 
Unit = {}
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleStreamingWrite.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleStreamingWrite.scala
index dc25289aa1e..5cb11b9280c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleStreamingWrite.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleStreamingWrite.scala
@@ -41,6 +41,8 @@ class ConsoleWrite(schema: StructType, options: 
CaseInsensitiveStringMap)
   def createStreamingWriterFactory(info: PhysicalWriteInfo): 
StreamingDataWriterFactory =
     PackedRowWriterFactory
 
+  override def useCommitCoordinator(): Boolean = false
+
   override def commit(epochId: Long, messages: Array[WriterCommitMessage]): 
Unit = {
     // We have to print a "Batch" label for the epoch for compatibility with 
the pre-data source V2
     // behavior.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala
index 3f474ea533c..e610a8dd127 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala
@@ -31,6 +31,10 @@ class MicroBatchWrite(epochId: Long, val writeSupport: 
StreamingWrite) extends B
     s"MicroBatchWrite[epoch: $epochId, writer: $writeSupport]"
   }
 
+  override def useCommitCoordinator(): Boolean = {
+    writeSupport.useCommitCoordinator()
+  }
+
   override def commit(messages: Array[WriterCommitMessage]): Unit = {
     writeSupport.commit(epochId, messages)
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala
index 7037bcf05c1..27a39bccfdd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala
@@ -143,6 +143,8 @@ class MemoryStreamingWrite(
     MemoryWriterFactory(schema)
   }
 
+  override def useCommitCoordinator(): Boolean = false
+
   override def commit(epochId: Long, messages: Array[WriterCommitMessage]): 
Unit = {
     val newRows = messages.flatMap {
       case message: MemoryWriterCommitMessage => message.data
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ReportSinkMetricsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ReportSinkMetricsSuite.scala
index c5bd389eeb9..7407ee71618 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ReportSinkMetricsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ReportSinkMetricsSuite.scala
@@ -36,53 +36,56 @@ class ReportSinkMetricsSuite extends StreamTest {
 
   import testImplicits._
 
-  test("test ReportSinkMetrics") {
-    val inputData = MemoryStream[Int]
-    val df = inputData.toDF()
-    var query: StreamingQuery = null
+  Seq("true", "false").foreach { useCommitCoordinator =>
+    test(s"test ReportSinkMetrics with 
useCommitCoordinator=$useCommitCoordinator") {
+      val inputData = MemoryStream[Int]
+      val df = inputData.toDF()
+      var query: StreamingQuery = null
 
-    var metricsMap: java.util.Map[String, String] = null
+      var metricsMap: java.util.Map[String, String] = null
 
-    val listener = new StreamingQueryListener {
+      val listener = new StreamingQueryListener {
 
-      override def onQueryStarted(event: 
StreamingQueryListener.QueryStartedEvent): Unit = {}
+        override def onQueryStarted(event: 
StreamingQueryListener.QueryStartedEvent): Unit = {}
 
-      override def onQueryProgress(event: 
StreamingQueryListener.QueryProgressEvent): Unit = {
-        metricsMap = event.progress.sink.metrics
+        override def onQueryProgress(event: 
StreamingQueryListener.QueryProgressEvent): Unit = {
+          metricsMap = event.progress.sink.metrics
+        }
+
+        override def onQueryTerminated(
+          event: StreamingQueryListener.QueryTerminatedEvent): Unit = {}
       }
 
-      override def onQueryTerminated(
-        event: StreamingQueryListener.QueryTerminatedEvent): Unit = {}
-    }
+      spark.streams.addListener(listener)
 
-    spark.streams.addListener(listener)
+      withTempDir { dir =>
+        try {
+          query =
+            df.writeStream
+              .outputMode("append")
+              .format("org.apache.spark.sql.streaming.TestSinkProvider")
+              .option("useCommitCoordinator", useCommitCoordinator)
+              .option("checkPointLocation", dir.toString)
+              .start()
 
-    withTempDir { dir =>
-      try {
-        query =
-          df.writeStream
-            .outputMode("append")
-            .format("org.apache.spark.sql.streaming.TestSinkProvider")
-            .option("checkPointLocation", dir.toString)
-            .start()
+          inputData.addData(1, 2, 3)
 
-        inputData.addData(1, 2, 3)
+          failAfter(streamingTimeout) {
+            query.processAllAvailable()
+          }
 
-        failAfter(streamingTimeout) {
-          query.processAllAvailable()
-        }
+          spark.sparkContext.listenerBus.waitUntilEmpty()
 
-        spark.sparkContext.listenerBus.waitUntilEmpty()
+          assertResult(metricsMap) {
+            Map("metrics-1" -> "value-1", "metrics-2" -> "value-2").asJava
+          }
+        } finally {
+          if (query != null) {
+            query.stop()
+          }
 
-        assertResult(metricsMap) {
-          Map("metrics-1" -> "value-1", "metrics-2" -> "value-2").asJava
-        }
-      } finally {
-        if (query != null) {
-          query.stop()
+          spark.streams.removeListener(listener)
         }
-
-        spark.streams.removeListener(listener)
       }
     }
   }
@@ -98,7 +101,8 @@ class ReportSinkMetricsSuite extends StreamTest {
     with CreatableRelationProvider with Logging {
 
     override def getTable(options: CaseInsensitiveStringMap): Table = {
-      TestSinkTable
+      val useCommitCoordinator = options.getBoolean("useCommitCoordinator", 
false)
+      new TestSinkTable(useCommitCoordinator)
     }
 
     def createRelation(
@@ -113,7 +117,8 @@ class ReportSinkMetricsSuite extends StreamTest {
     def shortName(): String = "test"
   }
 
-  object TestSinkTable extends Table with SupportsWrite with 
ReportsSinkMetrics with Logging {
+  class TestSinkTable(useCommitCoordinator: Boolean)
+    extends Table with SupportsWrite with ReportsSinkMetrics with Logging {
 
     override def name(): String = "test"
 
@@ -131,7 +136,7 @@ class ReportSinkMetricsSuite extends StreamTest {
         override def build(): Write = {
           new Write {
             override def toStreaming: StreamingWrite = {
-              new TestSinkWrite()
+              new TestSinkWrite(useCommitCoordinator)
             }
           }
         }
@@ -143,13 +148,15 @@ class ReportSinkMetricsSuite extends StreamTest {
     }
   }
 
-  class TestSinkWrite()
+  class TestSinkWrite(useCommitCoordinator: Boolean)
     extends StreamingWrite with Logging with Serializable {
 
     def createStreamingWriterFactory(info: PhysicalWriteInfo): 
StreamingDataWriterFactory =
       PackedRowWriterFactory
 
+    override def useCommitCoordinator(): Boolean = useCommitCoordinator
+
     override def commit(epochId: Long, messages: Array[WriterCommitMessage]): 
Unit = {}
 
     def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
-}
+  }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to