This is an automated email from the ASF dual-hosted git repository.
sandy pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new b0036509e0fd [SPARK-54191][SDP] Add once to Defineflow Proto
b0036509e0fd is described below
commit b0036509e0fd84deaa9b6e4b2e0952b2121536be
Author: Jacky Wang <[email protected]>
AuthorDate: Fri Nov 7 16:00:03 2025 -0800
[SPARK-54191][SDP] Add once to Defineflow Proto
### What changes were proposed in this pull request?
Add `once` to Defineflow Proto, to allow creating one-time back-fill flow
### Why are the changes needed?
Allow new API argument for SDP flow.
### Does this PR introduce _any_ user-facing change?
No, no API change yet
### How was this patch tested?
Proto changes
Closes #52890 from JiaqiWang18/SPARK-54191-StandaloneFlowDetails-proto.
Authored-by: Jacky Wang <[email protected]>
Signed-off-by: Sandy Ryza <[email protected]>
(cherry picked from commit 1a724ba6feab1142e66ae5fa12a15f2115eb8e5e)
Signed-off-by: Sandy Ryza <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 6 ++
python/pyspark/sql/connect/proto/pipelines_pb2.py | 70 +++++++++++-----------
python/pyspark/sql/connect/proto/pipelines_pb2.pyi | 21 +++++++
.../main/protobuf/spark/connect/pipelines.proto | 7 +++
.../sql/connect/pipelines/PipelinesHandler.scala | 5 ++
.../SparkDeclarativePipelinesServerSuite.scala | 18 ++++++
6 files changed, 92 insertions(+), 35 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index 49a217652791..e061df065ed5 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -1407,6 +1407,12 @@
],
"sqlState" : "42623"
},
+ "DEFINE_FLOW_ONCE_OPTION_NOT_SUPPORTED" : {
+ "message" : [
+ "Defining a one-time flow <flowName> with the 'once' option is not
supported."
+ ],
+ "sqlState" : "0A000"
+ },
"DESCRIBE_JSON_NOT_EXTENDED" : {
"message" : [
"DESCRIBE TABLE ... AS JSON only supported when [EXTENDED|FORMATTED] is
specified.",
diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.py
b/python/pyspark/sql/connect/proto/pipelines_pb2.py
index 139de83dc1aa..0eb77c84b5b5 100644
--- a/python/pyspark/sql/connect/proto/pipelines_pb2.py
+++ b/python/pyspark/sql/connect/proto/pipelines_pb2.py
@@ -42,7 +42,7 @@ from pyspark.sql.connect.proto import types_pb2 as
spark_dot_connect_dot_types__
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-
b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1aspark/connect/common.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\xcb"\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01
\x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12R\n\rdefine_output\x18\x02
\x01(\x0b\x32+.spark.connect.PipelineCommand.DefineOutp [...]
+
b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1aspark/connect/common.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\xed"\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01
\x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12R\n\rdefine_output\x18\x02
\x01(\x0b\x32+.spark.connect.PipelineCommand.DefineOutp [...]
)
_globals = globals()
@@ -69,10 +69,10 @@ if not _descriptor._USE_C_DESCRIPTORS:
]._serialized_options = b"8\001"
_globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._loaded_options = None
_globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_options =
b"8\001"
- _globals["_OUTPUTTYPE"]._serialized_start = 6105
- _globals["_OUTPUTTYPE"]._serialized_end = 6210
+ _globals["_OUTPUTTYPE"]._serialized_start = 6139
+ _globals["_OUTPUTTYPE"]._serialized_end = 6244
_globals["_PIPELINECOMMAND"]._serialized_start = 195
- _globals["_PIPELINECOMMAND"]._serialized_end = 4622
+ _globals["_PIPELINECOMMAND"]._serialized_end = 4656
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_start = 1129
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_end = 1437
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_start
= 1338
@@ -94,37 +94,37 @@ if not _descriptor._USE_C_DESCRIPTORS:
_globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS_OPTIONSENTRY"]._serialized_start
= 2659
_globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS_OPTIONSENTRY"]._serialized_end
= 2717
_globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_start = 2833
- _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_end = 3694
+ _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_end = 3728
_globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_start =
1338
_globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_end = 1396
-
_globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_start
= 3427
-
_globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_end
= 3524
- _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_start = 3526
- _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_end = 3584
- _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_start = 3697
- _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_end = 4019
- _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_start =
4022
- _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_end = 4221
-
_globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_start
= 4224
-
_globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_end
= 4382
-
_globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_start =
4385
- _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_end
= 4606
- _globals["_PIPELINECOMMANDRESULT"]._serialized_start = 4625
- _globals["_PIPELINECOMMANDRESULT"]._serialized_end = 5377
-
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_start
= 4994
-
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_end =
5092
- _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_start =
5095
- _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_end =
5228
- _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_start =
5231
- _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_end = 5362
- _globals["_PIPELINEEVENTRESULT"]._serialized_start = 5379
- _globals["_PIPELINEEVENTRESULT"]._serialized_end = 5452
- _globals["_PIPELINEEVENT"]._serialized_start = 5454
- _globals["_PIPELINEEVENT"]._serialized_end = 5570
- _globals["_SOURCECODELOCATION"]._serialized_start = 5573
- _globals["_SOURCECODELOCATION"]._serialized_end = 5814
- _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_start = 5816
- _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_end = 5885
- _globals["_PIPELINEANALYSISCONTEXT"]._serialized_start = 5888
- _globals["_PIPELINEANALYSISCONTEXT"]._serialized_end = 6103
+
_globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_start
= 3452
+
_globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_end
= 3549
+ _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_start = 3551
+ _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_end = 3609
+ _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_start = 3731
+ _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_end = 4053
+ _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_start =
4056
+ _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_end = 4255
+
_globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_start
= 4258
+
_globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_end
= 4416
+
_globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_start =
4419
+ _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_end
= 4640
+ _globals["_PIPELINECOMMANDRESULT"]._serialized_start = 4659
+ _globals["_PIPELINECOMMANDRESULT"]._serialized_end = 5411
+
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_start
= 5028
+
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_end =
5126
+ _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_start =
5129
+ _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_end =
5262
+ _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_start =
5265
+ _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_end = 5396
+ _globals["_PIPELINEEVENTRESULT"]._serialized_start = 5413
+ _globals["_PIPELINEEVENTRESULT"]._serialized_end = 5486
+ _globals["_PIPELINEEVENT"]._serialized_start = 5488
+ _globals["_PIPELINEEVENT"]._serialized_end = 5604
+ _globals["_SOURCECODELOCATION"]._serialized_start = 5607
+ _globals["_SOURCECODELOCATION"]._serialized_end = 5848
+ _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_start = 5850
+ _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_end = 5919
+ _globals["_PIPELINEANALYSISCONTEXT"]._serialized_start = 5922
+ _globals["_PIPELINEANALYSISCONTEXT"]._serialized_end = 6137
# @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi
b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi
index 60d131037c99..e0768a1f6bae 100644
--- a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi
@@ -588,6 +588,7 @@ class PipelineCommand(google.protobuf.message.Message):
SOURCE_CODE_LOCATION_FIELD_NUMBER: builtins.int
RELATION_FLOW_DETAILS_FIELD_NUMBER: builtins.int
EXTENSION_FIELD_NUMBER: builtins.int
+ ONCE_FIELD_NUMBER: builtins.int
dataflow_graph_id: builtins.str
"""The graph to attach this flow to."""
flow_name: builtins.str
@@ -612,6 +613,13 @@ class PipelineCommand(google.protobuf.message.Message):
) -> global___PipelineCommand.DefineFlow.WriteRelationFlowDetails: ...
@property
def extension(self) -> google.protobuf.any_pb2.Any: ...
+ once: builtins.bool
+ """If true, define the flow as a one-time flow, such as for backfill.
+ Set to true changes the flow in two ways:
+ - The flow is run one time by default. If the pipeline is ran with a
full refresh,
+ the flow will run again.
+ - The flow function must be a batch DataFrame, not a streaming
DataFrame.
+ """
def __init__(
self,
*,
@@ -624,6 +632,7 @@ class PipelineCommand(google.protobuf.message.Message):
relation_flow_details:
global___PipelineCommand.DefineFlow.WriteRelationFlowDetails
| None = ...,
extension: google.protobuf.any_pb2.Any | None = ...,
+ once: builtins.bool | None = ...,
) -> None: ...
def HasField(
self,
@@ -634,6 +643,8 @@ class PipelineCommand(google.protobuf.message.Message):
b"_dataflow_graph_id",
"_flow_name",
b"_flow_name",
+ "_once",
+ b"_once",
"_source_code_location",
b"_source_code_location",
"_target_dataset_name",
@@ -648,6 +659,8 @@ class PipelineCommand(google.protobuf.message.Message):
b"extension",
"flow_name",
b"flow_name",
+ "once",
+ b"once",
"relation_flow_details",
b"relation_flow_details",
"source_code_location",
@@ -665,6 +678,8 @@ class PipelineCommand(google.protobuf.message.Message):
b"_dataflow_graph_id",
"_flow_name",
b"_flow_name",
+ "_once",
+ b"_once",
"_source_code_location",
b"_source_code_location",
"_target_dataset_name",
@@ -679,6 +694,8 @@ class PipelineCommand(google.protobuf.message.Message):
b"extension",
"flow_name",
b"flow_name",
+ "once",
+ b"once",
"relation_flow_details",
b"relation_flow_details",
"source_code_location",
@@ -703,6 +720,10 @@ class PipelineCommand(google.protobuf.message.Message):
self, oneof_group: typing_extensions.Literal["_flow_name",
b"_flow_name"]
) -> typing_extensions.Literal["flow_name"] | None: ...
@typing.overload
+ def WhichOneof(
+ self, oneof_group: typing_extensions.Literal["_once", b"_once"]
+ ) -> typing_extensions.Literal["once"] | None: ...
+ @typing.overload
def WhichOneof(
self,
oneof_group: typing_extensions.Literal[
diff --git a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
index 0fa36f8a1514..a92e24fda915 100644
--- a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
+++ b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
@@ -152,6 +152,13 @@ message PipelineCommand {
optional spark.connect.Relation relation = 1;
}
+ // If true, define the flow as a one-time flow, such as for backfill.
+ // Set to true changes the flow in two ways:
+ // - The flow is run one time by default. If the pipeline is ran with a
full refresh,
+ // the flow will run again.
+ // - The flow function must be a batch DataFrame, not a streaming
DataFrame.
+ optional bool once = 8;
+
message Response {
// Fully qualified flow name that uniquely identify a flow in the
Dataflow graph.
optional string flow_name = 1;
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
index 0929b07be523..1a3b0d2231c6 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
@@ -267,6 +267,11 @@ private[connect] object PipelinesHandler extends Logging {
flow: proto.PipelineCommand.DefineFlow,
transformRelationFunc: Relation => LogicalPlan,
sessionHolder: SessionHolder): TableIdentifier = {
+ if (flow.hasOnce) {
+ throw new AnalysisException(
+ "DEFINE_FLOW_ONCE_OPTION_NOT_SUPPORTED",
+ Map("flowName" -> flow.getFlowName))
+ }
val dataflowGraphId = flow.getDataflowGraphId
val graphElementRegistry =
sessionHolder.dataflowGraphRegistry.getDataflowGraphOrThrow(dataflowGraphId)
diff --git
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala
index 9dba27c4525c..ab60462e8735 100644
---
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala
+++
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala
@@ -71,6 +71,24 @@ class SparkDeclarativePipelinesServerSuite
}
+ gridTest("Define flow 'once' argument not supported")(Seq(true, false)) {
onceValue =>
+ val ex = intercept[Exception] {
+ withRawBlockingStub { implicit stub =>
+ val graphId = createDataflowGraph
+ sendPlan(
+ buildPlanFromPipelineCommand(
+ PipelineCommand
+ .newBuilder()
+ .setDefineFlow(DefineFlow
+ .newBuilder()
+ .setDataflowGraphId(graphId)
+ .setOnce(onceValue))
+ .build()))
+ }
+ }
+ assert(ex.getMessage.contains("DEFINE_FLOW_ONCE_OPTION_NOT_SUPPORTED"))
+ }
+
test(
"Cross dependency between SQL dataset and non-SQL dataset is valid and can
be registered") {
withRawBlockingStub { implicit stub =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]