This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new b11022f042f [SPARK-44761][CONNECT] Support
DataStreamWriter.foreachBatch(VoidFunction2)
b11022f042f is described below
commit b11022f042f3264e24e3c202315e3eb2b8be1b14
Author: Herman van Hovell <[email protected]>
AuthorDate: Fri Aug 11 14:36:22 2023 +0200
[SPARK-44761][CONNECT] Support DataStreamWriter.foreachBatch(VoidFunction2)
### What changes were proposed in this pull request?
This PR adds the `DatastreamWriter.foreachBatch(VoidFunction2).
### Why are the changes needed?
To increase binary compatibility with the APIs in `sql/core`.
### Does this PR introduce _any_ user-facing change?
Yes. It adds a new method to DatastreamWriter.
### How was this patch tested?
I modified an existing code path.
Closes #42430 from hvanhovell/SPARK-44761.
Lead-authored-by: Herman van Hovell <[email protected]>
Co-authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
(cherry picked from commit 5a27dc9b6174fedefd08bcfe8a5b42bdfde2b7f6)
Signed-off-by: Herman van Hovell <[email protected]>
---
.../spark/sql/streaming/DataStreamWriter.scala | 22 ++++++++++++++++++++--
.../CheckConnectJvmClientCompatibility.scala | 3 ---
.../sql/streaming/ClientStreamingQuerySuite.scala | 9 ++++++---
.../apache/spark/sql/connect/common/UdfUtils.scala | 3 +++
4 files changed, 29 insertions(+), 8 deletions(-)
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index b9aa1f5bc58..54eb6e76140 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -25,13 +25,13 @@ import scala.collection.JavaConverters._
import com.google.protobuf.ByteString
import org.apache.spark.annotation.Evolving
+import org.apache.spark.api.java.function.VoidFunction2
import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.Command
import org.apache.spark.connect.proto.WriteStreamOperationStart
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Dataset, ForeachWriter}
-import org.apache.spark.sql.connect.common.DataTypeProtoConverter
-import org.apache.spark.sql.connect.common.ForeachWriterPacket
+import org.apache.spark.sql.connect.common.{DataTypeProtoConverter,
ForeachWriterPacket, UdfUtils}
import org.apache.spark.sql.execution.streaming.AvailableNowTrigger
import org.apache.spark.sql.execution.streaming.ContinuousTrigger
import org.apache.spark.sql.execution.streaming.OneTimeTrigger
@@ -247,6 +247,24 @@ final class DataStreamWriter[T] private[sql] (ds:
Dataset[T]) extends Logging {
this
}
+ /**
+ * :: Experimental ::
+ *
+ * (Java-specific) Sets the output of the streaming query to be processed
using the provided
+ * function. This is supported only in the micro-batch execution modes (that
is, when the
+ * trigger is not continuous). In every micro-batch, the provided function
will be called in
+ * every micro-batch with (i) the output rows as a Dataset and (ii) the
batch identifier. The
+ * batchId can be used to deduplicate and transactionally write the output
(that is, the
+ * provided Dataset) to external systems. The output Dataset is guaranteed
to be exactly the
+ * same for the same batchId (assuming all operations are deterministic in
the query).
+ *
+ * @since 3.5.0
+ */
+ @Evolving
+ def foreachBatch(function: VoidFunction2[Dataset[T], java.lang.Long]):
DataStreamWriter[T] = {
+ foreachBatch(UdfUtils.foreachBatchFuncToScalaFunc(function))
+ }
+
/**
* Starts the execution of the streaming query, which will continually
output results to the
* given path as new data arrives. The returned [[StreamingQuery]] object
can be used to
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
index 3fc02d7c397..04b162eceec 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
@@ -234,9 +234,6 @@ object CheckConnectJvmClientCompatibility {
// DataStreamWriter
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.streaming.DataStreamWriter$"),
- ProblemFilters.exclude[Problem](
- "org.apache.spark.sql.streaming.DataStreamWriter.foreachBatch" //
TODO(SPARK-42944)
- ),
ProblemFilters.exclude[Problem](
"org.apache.spark.sql.streaming.DataStreamWriter.SOURCE*" // These are
constant vals.
),
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala
index ab92431bc11..944a999a860 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala
@@ -27,6 +27,7 @@ import org.scalatest.concurrent.Eventually.eventually
import org.scalatest.concurrent.Futures.timeout
import org.scalatest.time.SpanSugar._
+import org.apache.spark.api.java.function.VoidFunction2
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, ForeachWriter, Row, SparkSession,
SQLHelper}
import org.apache.spark.sql.connect.client.util.QueryTest
@@ -412,11 +413,13 @@ class EventCollector extends StreamingQueryListener {
}
}
-class ForeachBatchFn(val viewName: String) extends ((DataFrame, Long) => Unit)
with Serializable {
- override def apply(df: DataFrame, batchId: Long): Unit = {
+class ForeachBatchFn(val viewName: String)
+ extends VoidFunction2[DataFrame, java.lang.Long]
+ with Serializable {
+ override def call(df: DataFrame, batchId: java.lang.Long): Unit = {
val count = df.count()
df.sparkSession
- .createDataFrame(Seq((batchId, count)))
+ .createDataFrame(Seq((batchId.toLong, count)))
.createOrReplaceGlobalTempView(viewName)
}
}
diff --git
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/UdfUtils.scala
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/UdfUtils.scala
index ceacc595d15..16d5823f4a4 100644
---
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/UdfUtils.scala
+++
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/UdfUtils.scala
@@ -59,6 +59,9 @@ private[sql] object UdfUtils extends Serializable {
def foreachPartitionFuncToScalaFunc[T](f: ForeachPartitionFunction[T]):
Iterator[T] => Unit =
x => f.call(x.asJava)
+ def foreachBatchFuncToScalaFunc[D](f: VoidFunction2[D, java.lang.Long]): (D,
Long) => Unit =
+ (d, i) => f.call(d, i)
+
def flatMapFuncToScalaFunc[T, U](f: FlatMapFunction[T, U]): T =>
TraversableOnce[U] = x =>
f.call(x).asScala
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]