This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new c60254e7baba [SPARK-54452] Fix empty response from SparkConnect server
for `spark.sql(...)` inside FlowFunction
c60254e7baba is described below
commit c60254e7baba1526c8af0e5cfca46009f182e081
Author: Yuheng Chang <[email protected]>
AuthorDate: Sun Nov 23 06:04:56 2025 -0800
[SPARK-54452] Fix empty response from SparkConnect server for
`spark.sql(...)` inside FlowFunction
### What changes were proposed in this pull request?
In PR #53024, we added SDP support for `spark.sql(...)` inside a
FlowFunction. For these calls, instead of eagerly executing the SQL, the Spark
Connect server should return the raw logical plan to the client and defer
execution to the flow function.
However, in that PR we constructed the response object but forgot to
actually return it to the Spark Connect client, so the client received an empty
response.
This went unnoticed in tests because, when the client sees an empty
`spark.sql(...)` response, [it falls back to creating an empty DataFrame
holding the raw logical
plan](https://github.com/apache/spark/blob/master/python/pyspark/sql/connect/session.py#L829-L835),
which happens to match the desired behavior. This PR fixes the bug by
returning the proper response instead of relying on that implicit fallback.
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
This PR fixes a bug introduced in #53024 where the server did not return
the constructed spark.sql(...) response to the client.
### How was this patch tested?
New tests
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #53156 from SCHJonathan/jonathan-chang_data/fix-spark-sql-bug.
Authored-by: Yuheng Chang <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 997525cae9b00e47626ce09f00e779b279551ace)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../sql/connect/planner/SparkConnectPlanner.scala | 8 ++
.../SparkDeclarativePipelinesServerSuite.scala | 94 ++++++++++++++++++++++
2 files changed, 102 insertions(+)
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 644784fa3db6..9af2e7cb4661 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -2992,6 +2992,14 @@ class SparkConnectPlanner(
// the SQL command and defer the actual analysis and execution to the flow
function.
if (insidePipelineFlowFunction) {
result.setRelation(relation)
+ executeHolder.eventsManager.postFinished()
+ responseObserver.onNext(
+ ExecutePlanResponse
+ .newBuilder()
+ .setSessionId(sessionHolder.sessionId)
+ .setServerSideSessionId(sessionHolder.serverSessionId)
+ .setSqlCommandResult(result)
+ .build)
return
}
diff --git
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala
index c9551646385c..3cb45fa6e172 100644
---
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala
+++
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala
@@ -850,4 +850,98 @@ class SparkDeclarativePipelinesServerSuite
}
}
}
+
+ test(
+ "SPARK-54452: spark.sql() inside a pipeline flow function should return a
sql_command_result") {
+ withRawBlockingStub { implicit stub =>
+ val graphId = createDataflowGraph
+ val pipelineAnalysisContext = proto.PipelineAnalysisContext
+ .newBuilder()
+ .setDataflowGraphId(graphId)
+ .setFlowName("flow1")
+ .build()
+ val userContext = proto.UserContext
+ .newBuilder()
+ .addExtensions(com.google.protobuf.Any.pack(pipelineAnalysisContext))
+ .setUserId("test_user")
+ .build()
+
+ val relation = proto.Plan
+ .newBuilder()
+ .setCommand(
+ proto.Command
+ .newBuilder()
+ .setSqlCommand(
+ proto.SqlCommand
+ .newBuilder()
+ .setInput(
+ proto.Relation
+ .newBuilder()
+ .setRead(proto.Read
+ .newBuilder()
+ .setNamedTable(
+
proto.Read.NamedTable.newBuilder().setUnparsedIdentifier("table"))
+ .build())
+ .build()))
+ .build())
+ .build()
+
+ val sparkSqlRequest = proto.ExecutePlanRequest
+ .newBuilder()
+ .setUserContext(userContext)
+ .setPlan(relation)
+ .setSessionId(UUID.randomUUID().toString)
+ .build()
+ val sparkSqlResponse = stub.executePlan(sparkSqlRequest).next()
+ assert(sparkSqlResponse.hasSqlCommandResult)
+ assert(
+ sparkSqlResponse.getSqlCommandResult.getRelation ==
+ relation.getCommand.getSqlCommand.getInput)
+ }
+ }
+
+ test(
+ "SPARK-54452: spark.sql() outside a pipeline flow function should return a
" +
+ "sql_command_result") {
+ withRawBlockingStub { implicit stub =>
+ val graphId = createDataflowGraph
+ val pipelineAnalysisContext = proto.PipelineAnalysisContext
+ .newBuilder()
+ .setDataflowGraphId(graphId)
+ .build()
+ val userContext = proto.UserContext
+ .newBuilder()
+ .addExtensions(com.google.protobuf.Any.pack(pipelineAnalysisContext))
+ .setUserId("test_user")
+ .build()
+
+ val relation = proto.Plan
+ .newBuilder()
+ .setCommand(
+ proto.Command
+ .newBuilder()
+ .setSqlCommand(
+ proto.SqlCommand
+ .newBuilder()
+ .setInput(proto.Relation
+ .newBuilder()
+ .setSql(proto.SQL.newBuilder().setQuery("SELECT * FROM
RANGE(5)"))
+ .build())
+ .build())
+ .build())
+ .build()
+
+ val sparkSqlRequest = proto.ExecutePlanRequest
+ .newBuilder()
+ .setUserContext(userContext)
+ .setPlan(relation)
+ .setSessionId(UUID.randomUUID().toString)
+ .build()
+ val sparkSqlResponse = stub.executePlan(sparkSqlRequest).next()
+ assert(sparkSqlResponse.hasSqlCommandResult)
+ assert(
+ sparkSqlResponse.getSqlCommandResult.getRelation ==
+ relation.getCommand.getSqlCommand.getInput)
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]