This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e359f210c45 [SPARK-45588][PROTOBUF][CONNECT][MINOR] Scaladoc
improvement for StreamingForeachBatchHelper
e359f210c45 is described below
commit e359f210c45abc17f0bcd32c9a86faf678caff75
Author: Raghu Angadi <[email protected]>
AuthorDate: Thu Oct 19 14:15:43 2023 +0900
[SPARK-45588][PROTOBUF][CONNECT][MINOR] Scaladoc improvement for
StreamingForeachBatchHelper
### What changes were proposed in this pull request?
Couple of minor improvements to `StreamingForeachBatchHelper`:
* Make `RunnerCleaner` private and add ScalaDoc.
* Update contract for `pythonForeachBatchWrapper()` to inform that call
should eventually should `close()` the `AutoClosable` returned.
In addition, it also fixes a flake in Protobuf unit test.
### Why are the changes needed?
- Code readability improvement.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
- Existing tests.
- For protobuf suite, verified with seed set to '399'. It fails before
this PR and passes after.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #43424 from rangadi/feb-scaladoc.
Authored-by: Raghu Angadi <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../apache/spark/sql/connect/planner/SparkConnectPlanner.scala | 2 +-
.../spark/sql/connect/planner/StreamingForeachBatchHelper.scala | 9 ++++++---
.../sql/connect/service/SparkConnectSessionHodlerSuite.scala | 4 +++-
.../spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala | 2 +-
4 files changed, 11 insertions(+), 6 deletions(-)
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index fa964c02a25..299f4f8830a 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -2927,7 +2927,7 @@ class SparkConnectPlanner(
}
// This is filled when a foreach batch runner started for Python.
- var foreachBatchRunnerCleaner:
Option[StreamingForeachBatchHelper.RunnerCleaner] = None
+ var foreachBatchRunnerCleaner: Option[AutoCloseable] = None
if (writeOp.hasForeachBatch) {
val foreachBatchFn = writeOp.getForeachBatch.getFunctionCase match {
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala
index b8097b23550..ce75ba3eb59 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala
@@ -40,7 +40,9 @@ object StreamingForeachBatchHelper extends Logging {
type ForeachBatchFnType = (DataFrame, Long) => Unit
- case class RunnerCleaner(runner: StreamingPythonRunner) extends
AutoCloseable {
+ // Visible for testing.
+ /** An AutoClosable to clean up resources on query termination. Stops Python
worker. */
+ private[connect] case class RunnerCleaner(runner: StreamingPythonRunner)
extends AutoCloseable {
override def close(): Unit = {
try runner.stop()
catch {
@@ -98,11 +100,12 @@ object StreamingForeachBatchHelper extends Logging {
/**
* Starts up Python worker and initializes it with Python function. Returns
a foreachBatch
* function that sets up the session and Dataframe cache and and interacts
with the Python
- * worker to execute user's function.
+ * worker to execute user's function. In addition, it returns an
AutoClosable. The caller must
+ * ensure it is closed so that worker process and related resources are
released.
*/
def pythonForeachBatchWrapper(
pythonFn: SimplePythonFunction,
- sessionHolder: SessionHolder): (ForeachBatchFnType, RunnerCleaner) = {
+ sessionHolder: SessionHolder): (ForeachBatchFnType, AutoCloseable) = {
val port = SparkConnectService.localPort
val connectUrl = s"sc://localhost:$port/;user_id=${sessionHolder.userId}"
diff --git
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala
index a6451de8fc2..910c2a2650c 100644
---
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala
+++
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala
@@ -31,6 +31,7 @@ import org.apache.spark.api.python.SimplePythonFunction
import org.apache.spark.sql.IntegratedUDFTestUtils
import org.apache.spark.sql.connect.common.InvalidPlanInput
import org.apache.spark.sql.connect.planner.{PythonStreamingQueryListener,
StreamingForeachBatchHelper}
+import
org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper.RunnerCleaner
import org.apache.spark.sql.test.SharedSparkSession
class SparkConnectSessionHolderSuite extends SharedSparkSession {
@@ -206,7 +207,8 @@ class SparkConnectSessionHolderSuite extends
SharedSparkSession {
sessionHolder.streamingForeachBatchRunnerCleanerCache
.registerCleanerForQuery(query2, cleaner2)
- val (runner1, runner2) = (cleaner1.runner, cleaner2.runner)
+ val (runner1, runner2) =
+ (cleaner1.asInstanceOf[RunnerCleaner].runner,
cleaner2.asInstanceOf[RunnerCleaner].runner)
// assert both python processes are running
assert(!runner1.isWorkerStopped().get)
diff --git
a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
index d3e63a11a66..6135cb2d592 100644
---
a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
+++
b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
@@ -137,7 +137,7 @@ class ProtobufCatalystDataConversionSuite
while (
data != null &&
(data.get(0) == defaultValue ||
- (dt == BinaryType &&
+ (dt.fields(0).dataType == BinaryType &&
data.get(0).asInstanceOf[Array[Byte]].isEmpty)))
data = generator().asInstanceOf[Row]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]