This is an automated email from the ASF dual-hosted git repository.

sandy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a724ba6feab [SPARK-54191][SDP] Add once to Defineflow Proto
1a724ba6feab is described below

commit 1a724ba6feab1142e66ae5fa12a15f2115eb8e5e
Author: Jacky Wang <[email protected]>
AuthorDate: Fri Nov 7 16:00:03 2025 -0800

    [SPARK-54191][SDP] Add once to Defineflow Proto
    
    ### What changes were proposed in this pull request?
    
    Add `once` to Defineflow Proto, to allow creating one-time back-fill flow
    
    ### Why are the changes needed?
    
    Allow new API argument for SDP flow.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, no API change yet
    
    ### How was this patch tested?
    
    Proto changes
    
    Closes #52890 from JiaqiWang18/SPARK-54191-StandaloneFlowDetails-proto.
    
    Authored-by: Jacky Wang <[email protected]>
    Signed-off-by: Sandy Ryza <[email protected]>
---
 .../src/main/resources/error/error-conditions.json |  6 ++
 python/pyspark/sql/connect/proto/pipelines_pb2.py  | 70 +++++++++++-----------
 python/pyspark/sql/connect/proto/pipelines_pb2.pyi | 21 +++++++
 .../main/protobuf/spark/connect/pipelines.proto    |  7 +++
 .../sql/connect/pipelines/PipelinesHandler.scala   |  5 ++
 .../SparkDeclarativePipelinesServerSuite.scala     | 18 ++++++
 6 files changed, 92 insertions(+), 35 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index 4b6bf754c3ec..a13a4694c668 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -1412,6 +1412,12 @@
     ],
     "sqlState" : "42623"
   },
+  "DEFINE_FLOW_ONCE_OPTION_NOT_SUPPORTED" : {
+    "message" : [
+      "Defining a one-time flow <flowName> with the 'once' option is not 
supported."
+    ],
+    "sqlState" : "0A000"
+  },
   "DESCRIBE_JSON_NOT_EXTENDED" : {
     "message" : [
       "DESCRIBE TABLE ... AS JSON only supported when [EXTENDED|FORMATTED] is 
specified.",
diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.py 
b/python/pyspark/sql/connect/proto/pipelines_pb2.py
index 139de83dc1aa..0eb77c84b5b5 100644
--- a/python/pyspark/sql/connect/proto/pipelines_pb2.py
+++ b/python/pyspark/sql/connect/proto/pipelines_pb2.py
@@ -42,7 +42,7 @@ from pyspark.sql.connect.proto import types_pb2 as 
spark_dot_connect_dot_types__
 
 
 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-    
b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1aspark/connect/common.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\xcb"\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01
 
\x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12R\n\rdefine_output\x18\x02
 \x01(\x0b\x32+.spark.connect.PipelineCommand.DefineOutp [...]
+    
b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1aspark/connect/common.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\xed"\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01
 
\x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12R\n\rdefine_output\x18\x02
 \x01(\x0b\x32+.spark.connect.PipelineCommand.DefineOutp [...]
 )
 
 _globals = globals()
@@ -69,10 +69,10 @@ if not _descriptor._USE_C_DESCRIPTORS:
     ]._serialized_options = b"8\001"
     _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._loaded_options = None
     _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_options = 
b"8\001"
-    _globals["_OUTPUTTYPE"]._serialized_start = 6105
-    _globals["_OUTPUTTYPE"]._serialized_end = 6210
+    _globals["_OUTPUTTYPE"]._serialized_start = 6139
+    _globals["_OUTPUTTYPE"]._serialized_end = 6244
     _globals["_PIPELINECOMMAND"]._serialized_start = 195
-    _globals["_PIPELINECOMMAND"]._serialized_end = 4622
+    _globals["_PIPELINECOMMAND"]._serialized_end = 4656
     _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_start = 1129
     _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_end = 1437
     
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_start 
= 1338
@@ -94,37 +94,37 @@ if not _descriptor._USE_C_DESCRIPTORS:
     
_globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS_OPTIONSENTRY"]._serialized_start
 = 2659
     
_globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS_OPTIONSENTRY"]._serialized_end
 = 2717
     _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_start = 2833
-    _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_end = 3694
+    _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_end = 3728
     _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_start = 
1338
     _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_end = 1396
-    
_globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_start
 = 3427
-    
_globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_end
 = 3524
-    _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_start = 3526
-    _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_end = 3584
-    _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_start = 3697
-    _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_end = 4019
-    _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_start = 
4022
-    _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_end = 4221
-    
_globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_start
 = 4224
-    
_globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_end
 = 4382
-    
_globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_start = 
4385
-    _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_end 
= 4606
-    _globals["_PIPELINECOMMANDRESULT"]._serialized_start = 4625
-    _globals["_PIPELINECOMMANDRESULT"]._serialized_end = 5377
-    
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_start 
= 4994
-    
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_end = 
5092
-    _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_start = 
5095
-    _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_end = 
5228
-    _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_start = 
5231
-    _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_end = 5362
-    _globals["_PIPELINEEVENTRESULT"]._serialized_start = 5379
-    _globals["_PIPELINEEVENTRESULT"]._serialized_end = 5452
-    _globals["_PIPELINEEVENT"]._serialized_start = 5454
-    _globals["_PIPELINEEVENT"]._serialized_end = 5570
-    _globals["_SOURCECODELOCATION"]._serialized_start = 5573
-    _globals["_SOURCECODELOCATION"]._serialized_end = 5814
-    _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_start = 5816
-    _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_end = 5885
-    _globals["_PIPELINEANALYSISCONTEXT"]._serialized_start = 5888
-    _globals["_PIPELINEANALYSISCONTEXT"]._serialized_end = 6103
+    
_globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_start
 = 3452
+    
_globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_end
 = 3549
+    _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_start = 3551
+    _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_end = 3609
+    _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_start = 3731
+    _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_end = 4053
+    _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_start = 
4056
+    _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_end = 4255
+    
_globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_start
 = 4258
+    
_globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_end
 = 4416
+    
_globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_start = 
4419
+    _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_end 
= 4640
+    _globals["_PIPELINECOMMANDRESULT"]._serialized_start = 4659
+    _globals["_PIPELINECOMMANDRESULT"]._serialized_end = 5411
+    
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_start 
= 5028
+    
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_end = 
5126
+    _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_start = 
5129
+    _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_end = 
5262
+    _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_start = 
5265
+    _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_end = 5396
+    _globals["_PIPELINEEVENTRESULT"]._serialized_start = 5413
+    _globals["_PIPELINEEVENTRESULT"]._serialized_end = 5486
+    _globals["_PIPELINEEVENT"]._serialized_start = 5488
+    _globals["_PIPELINEEVENT"]._serialized_end = 5604
+    _globals["_SOURCECODELOCATION"]._serialized_start = 5607
+    _globals["_SOURCECODELOCATION"]._serialized_end = 5848
+    _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_start = 5850
+    _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_end = 5919
+    _globals["_PIPELINEANALYSISCONTEXT"]._serialized_start = 5922
+    _globals["_PIPELINEANALYSISCONTEXT"]._serialized_end = 6137
 # @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi 
b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi
index 60d131037c99..e0768a1f6bae 100644
--- a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi
@@ -588,6 +588,7 @@ class PipelineCommand(google.protobuf.message.Message):
         SOURCE_CODE_LOCATION_FIELD_NUMBER: builtins.int
         RELATION_FLOW_DETAILS_FIELD_NUMBER: builtins.int
         EXTENSION_FIELD_NUMBER: builtins.int
+        ONCE_FIELD_NUMBER: builtins.int
         dataflow_graph_id: builtins.str
         """The graph to attach this flow to."""
         flow_name: builtins.str
@@ -612,6 +613,13 @@ class PipelineCommand(google.protobuf.message.Message):
         ) -> global___PipelineCommand.DefineFlow.WriteRelationFlowDetails: ...
         @property
         def extension(self) -> google.protobuf.any_pb2.Any: ...
+        once: builtins.bool
+        """If true, define the flow as a one-time flow, such as for backfill.
+        Set to true changes the flow in two ways:
+          - The flow is run one time by default. If the pipeline is ran with a 
full refresh,
+            the flow will run again.
+          - The flow function must be a batch DataFrame, not a streaming 
DataFrame.
+        """
         def __init__(
             self,
             *,
@@ -624,6 +632,7 @@ class PipelineCommand(google.protobuf.message.Message):
             relation_flow_details: 
global___PipelineCommand.DefineFlow.WriteRelationFlowDetails
             | None = ...,
             extension: google.protobuf.any_pb2.Any | None = ...,
+            once: builtins.bool | None = ...,
         ) -> None: ...
         def HasField(
             self,
@@ -634,6 +643,8 @@ class PipelineCommand(google.protobuf.message.Message):
                 b"_dataflow_graph_id",
                 "_flow_name",
                 b"_flow_name",
+                "_once",
+                b"_once",
                 "_source_code_location",
                 b"_source_code_location",
                 "_target_dataset_name",
@@ -648,6 +659,8 @@ class PipelineCommand(google.protobuf.message.Message):
                 b"extension",
                 "flow_name",
                 b"flow_name",
+                "once",
+                b"once",
                 "relation_flow_details",
                 b"relation_flow_details",
                 "source_code_location",
@@ -665,6 +678,8 @@ class PipelineCommand(google.protobuf.message.Message):
                 b"_dataflow_graph_id",
                 "_flow_name",
                 b"_flow_name",
+                "_once",
+                b"_once",
                 "_source_code_location",
                 b"_source_code_location",
                 "_target_dataset_name",
@@ -679,6 +694,8 @@ class PipelineCommand(google.protobuf.message.Message):
                 b"extension",
                 "flow_name",
                 b"flow_name",
+                "once",
+                b"once",
                 "relation_flow_details",
                 b"relation_flow_details",
                 "source_code_location",
@@ -703,6 +720,10 @@ class PipelineCommand(google.protobuf.message.Message):
             self, oneof_group: typing_extensions.Literal["_flow_name", 
b"_flow_name"]
         ) -> typing_extensions.Literal["flow_name"] | None: ...
         @typing.overload
+        def WhichOneof(
+            self, oneof_group: typing_extensions.Literal["_once", b"_once"]
+        ) -> typing_extensions.Literal["once"] | None: ...
+        @typing.overload
         def WhichOneof(
             self,
             oneof_group: typing_extensions.Literal[
diff --git a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto 
b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
index 0fa36f8a1514..a92e24fda915 100644
--- a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
+++ b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
@@ -152,6 +152,13 @@ message PipelineCommand {
       optional spark.connect.Relation relation = 1;
     }
 
+    // If true, define the flow as a one-time flow, such as for backfill.
+    // Set to true changes the flow in two ways:
+    //   - The flow is run one time by default. If the pipeline is ran with a 
full refresh,
+    //     the flow will run again.
+    //   - The flow function must be a batch DataFrame, not a streaming 
DataFrame.
+    optional bool once = 8;
+
     message Response {
       // Fully qualified flow name that uniquely identify a flow in the 
Dataflow graph.
       optional string flow_name = 1;
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
index 0929b07be523..1a3b0d2231c6 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
@@ -267,6 +267,11 @@ private[connect] object PipelinesHandler extends Logging {
       flow: proto.PipelineCommand.DefineFlow,
       transformRelationFunc: Relation => LogicalPlan,
       sessionHolder: SessionHolder): TableIdentifier = {
+    if (flow.hasOnce) {
+      throw new AnalysisException(
+        "DEFINE_FLOW_ONCE_OPTION_NOT_SUPPORTED",
+        Map("flowName" -> flow.getFlowName))
+    }
     val dataflowGraphId = flow.getDataflowGraphId
     val graphElementRegistry =
       
sessionHolder.dataflowGraphRegistry.getDataflowGraphOrThrow(dataflowGraphId)
diff --git 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala
 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala
index 9dba27c4525c..ab60462e8735 100644
--- 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala
+++ 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala
@@ -71,6 +71,24 @@ class SparkDeclarativePipelinesServerSuite
 
   }
 
+  gridTest("Define flow 'once' argument not supported")(Seq(true, false)) { 
onceValue =>
+    val ex = intercept[Exception] {
+      withRawBlockingStub { implicit stub =>
+        val graphId = createDataflowGraph
+        sendPlan(
+          buildPlanFromPipelineCommand(
+            PipelineCommand
+              .newBuilder()
+              .setDefineFlow(DefineFlow
+                .newBuilder()
+                .setDataflowGraphId(graphId)
+                .setOnce(onceValue))
+              .build()))
+      }
+    }
+    assert(ex.getMessage.contains("DEFINE_FLOW_ONCE_OPTION_NOT_SUPPORTED"))
+  }
+
   test(
     "Cross dependency between SQL dataset and non-SQL dataset is valid and can 
be registered") {
     withRawBlockingStub { implicit stub =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to