This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7da1356837e8 [SPARK-55264] Add ExecuteOutput command to Spark Connect
pipelines proto
7da1356837e8 is described below
commit 7da1356837e85c548d8da76c0db8143b3e6f87ab
Author: Aditya Nambiar <[email protected]>
AuthorDate: Tue Feb 24 11:50:34 2026 -0400
[SPARK-55264] Add ExecuteOutput command to Spark Connect pipelines proto
### What changes were proposed in this pull request?
This pull request adds a new ExecuteOutput command to the Spark Connect
pipelines protobuf definition. The new command enables clients to directly
execute multiple flows writing to an output.
### Why are the changes needed?
Required to enable standlone Python MV/ST
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
### Was this patch authored or co-authored using generative AI tooling?
Closes #54104 from aditya-nambiar/SPARK-55264.
Authored-by: Aditya Nambiar <[email protected]>
Signed-off-by: Herman van Hövell <[email protected]>
---
python/pyspark/sql/connect/proto/pipelines_pb2.py | 110 +++++++++++----------
python/pyspark/sql/connect/proto/pipelines_pb2.pyi | 101 +++++++++++++++++++
.../main/protobuf/spark/connect/pipelines.proto | 21 ++++
3 files changed, 178 insertions(+), 54 deletions(-)
diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.py
b/python/pyspark/sql/connect/proto/pipelines_pb2.py
index ddd60cf1ac7c..ebdefc10ec05 100644
--- a/python/pyspark/sql/connect/proto/pipelines_pb2.py
+++ b/python/pyspark/sql/connect/proto/pipelines_pb2.py
@@ -42,7 +42,7 @@ from pyspark.sql.connect.proto import types_pb2 as
spark_dot_connect_dot_types__
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-
b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1aspark/connect/common.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\xed"\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01
\x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12R\n\rdefine_output\x18\x02
\x01(\x0b\x32+.spark.connect.PipelineCommand.DefineOutp [...]
+
b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1aspark/connect/common.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\xbb&\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01
\x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12R\n\rdefine_output\x18\x02
\x01(\x0b\x32+.spark.connect.PipelineCommand.DefineOutp [...]
)
_globals = globals()
@@ -69,62 +69,64 @@ if not _descriptor._USE_C_DESCRIPTORS:
]._serialized_options = b"8\001"
_globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._loaded_options = None
_globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_options =
b"8\001"
- _globals["_OUTPUTTYPE"]._serialized_start = 6187
- _globals["_OUTPUTTYPE"]._serialized_end = 6292
+ _globals["_OUTPUTTYPE"]._serialized_start = 6649
+ _globals["_OUTPUTTYPE"]._serialized_end = 6754
_globals["_PIPELINECOMMAND"]._serialized_start = 195
- _globals["_PIPELINECOMMAND"]._serialized_end = 4656
- _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_start = 1129
- _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_end = 1437
-
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_start
= 1338
-
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_end =
1396
- _globals["_PIPELINECOMMAND_DROPDATAFLOWGRAPH"]._serialized_start = 1439
- _globals["_PIPELINECOMMAND_DROPDATAFLOWGRAPH"]._serialized_end = 1529
- _globals["_PIPELINECOMMAND_DEFINEOUTPUT"]._serialized_start = 1532
- _globals["_PIPELINECOMMAND_DEFINEOUTPUT"]._serialized_end = 2830
- _globals["_PIPELINECOMMAND_DEFINEOUTPUT_TABLEDETAILS"]._serialized_start =
2068
- _globals["_PIPELINECOMMAND_DEFINEOUTPUT_TABLEDETAILS"]._serialized_end =
2516
+ _globals["_PIPELINECOMMAND"]._serialized_end = 5118
+ _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_start = 1232
+ _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_end = 1540
+
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_start
= 1441
+
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_end =
1499
+ _globals["_PIPELINECOMMAND_DROPDATAFLOWGRAPH"]._serialized_start = 1542
+ _globals["_PIPELINECOMMAND_DROPDATAFLOWGRAPH"]._serialized_end = 1632
+ _globals["_PIPELINECOMMAND_DEFINEOUTPUT"]._serialized_start = 1635
+ _globals["_PIPELINECOMMAND_DEFINEOUTPUT"]._serialized_end = 2933
+ _globals["_PIPELINECOMMAND_DEFINEOUTPUT_TABLEDETAILS"]._serialized_start =
2171
+ _globals["_PIPELINECOMMAND_DEFINEOUTPUT_TABLEDETAILS"]._serialized_end =
2619
_globals[
"_PIPELINECOMMAND_DEFINEOUTPUT_TABLEDETAILS_TABLEPROPERTIESENTRY"
- ]._serialized_start = 2429
+ ]._serialized_start = 2532
_globals[
"_PIPELINECOMMAND_DEFINEOUTPUT_TABLEDETAILS_TABLEPROPERTIESENTRY"
- ]._serialized_end = 2495
- _globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS"]._serialized_start =
2519
- _globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS"]._serialized_end =
2728
-
_globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS_OPTIONSENTRY"]._serialized_start
= 2659
-
_globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS_OPTIONSENTRY"]._serialized_end
= 2717
- _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_start = 2833
- _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_end = 3728
- _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_start =
1338
- _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_end = 1396
-
_globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_start
= 3452
-
_globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_end
= 3549
- _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_start = 3551
- _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_end = 3609
- _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_start = 3731
- _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_end = 4053
- _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_start =
4056
- _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_end = 4255
-
_globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_start
= 4258
-
_globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_end
= 4416
-
_globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_start =
4419
- _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_end
= 4640
- _globals["_PIPELINECOMMANDRESULT"]._serialized_start = 4659
- _globals["_PIPELINECOMMANDRESULT"]._serialized_end = 5411
-
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_start
= 5028
-
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_end =
5126
- _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_start =
5129
- _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_end =
5262
- _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_start =
5265
- _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_end = 5396
- _globals["_PIPELINEEVENTRESULT"]._serialized_start = 5413
- _globals["_PIPELINEEVENTRESULT"]._serialized_end = 5486
- _globals["_PIPELINEEVENT"]._serialized_start = 5488
- _globals["_PIPELINEEVENT"]._serialized_end = 5604
- _globals["_SOURCECODELOCATION"]._serialized_start = 5607
- _globals["_SOURCECODELOCATION"]._serialized_end = 5848
- _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_start = 5850
- _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_end = 5919
- _globals["_PIPELINEANALYSISCONTEXT"]._serialized_start = 5922
- _globals["_PIPELINEANALYSISCONTEXT"]._serialized_end = 6185
+ ]._serialized_end = 2598
+ _globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS"]._serialized_start =
2622
+ _globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS"]._serialized_end =
2831
+
_globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS_OPTIONSENTRY"]._serialized_start
= 2762
+
_globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS_OPTIONSENTRY"]._serialized_end
= 2820
+ _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_start = 2936
+ _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_end = 3831
+ _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_start =
1441
+ _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_end = 1499
+
_globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_start
= 3555
+
_globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_end
= 3652
+ _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_start = 3654
+ _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_end = 3712
+ _globals["_PIPELINECOMMAND_EXECUTEOUTPUTFLOWS"]._serialized_start = 3834
+ _globals["_PIPELINECOMMAND_EXECUTEOUTPUTFLOWS"]._serialized_end = 4190
+ _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_start = 4193
+ _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_end = 4515
+ _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_start =
4518
+ _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_end = 4717
+
_globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_start
= 4720
+
_globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_end
= 4878
+
_globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_start =
4881
+ _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_end
= 5102
+ _globals["_PIPELINECOMMANDRESULT"]._serialized_start = 5121
+ _globals["_PIPELINECOMMANDRESULT"]._serialized_end = 5873
+
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_start
= 5490
+
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_end =
5588
+ _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_start =
5591
+ _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_end =
5724
+ _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_start =
5727
+ _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_end = 5858
+ _globals["_PIPELINEEVENTRESULT"]._serialized_start = 5875
+ _globals["_PIPELINEEVENTRESULT"]._serialized_end = 5948
+ _globals["_PIPELINEEVENT"]._serialized_start = 5950
+ _globals["_PIPELINEEVENT"]._serialized_end = 6066
+ _globals["_SOURCECODELOCATION"]._serialized_start = 6069
+ _globals["_SOURCECODELOCATION"]._serialized_end = 6310
+ _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_start = 6312
+ _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_end = 6381
+ _globals["_PIPELINEANALYSISCONTEXT"]._serialized_start = 6384
+ _globals["_PIPELINEANALYSISCONTEXT"]._serialized_end = 6647
# @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi
b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi
index 39a1e29ae7dd..391746c04993 100644
--- a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi
@@ -740,6 +740,98 @@ class PipelineCommand(google.protobuf.message.Message):
self, oneof_group: typing_extensions.Literal["details", b"details"]
) -> typing_extensions.Literal["relation_flow_details", "extension"] |
None: ...
+ class ExecuteOutputFlows(google.protobuf.message.Message):
+ """Request to execute all flows for a single output (dataset or sink)
remotely."""
+
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ DEFINE_OUTPUT_FIELD_NUMBER: builtins.int
+ DEFINE_FLOWS_FIELD_NUMBER: builtins.int
+ FULL_REFRESH_FIELD_NUMBER: builtins.int
+ STORAGE_FIELD_NUMBER: builtins.int
+ EXTENSION_FIELD_NUMBER: builtins.int
+ @property
+ def define_output(self) -> global___PipelineCommand.DefineOutput:
+ """The output (table or materialized view or sink) definition."""
+ @property
+ def define_flows(
+ self,
+ ) ->
google.protobuf.internal.containers.RepeatedCompositeFieldContainer[
+ global___PipelineCommand.DefineFlow
+ ]:
+ """The flows to execute for this table."""
+ full_refresh: builtins.bool
+ """Whether to perform a full refresh instead of an incremental
update."""
+ storage: builtins.str
+ """Storage location for pipeline checkpoints and metadata."""
+ @property
+ def extension(
+ self,
+ ) ->
google.protobuf.internal.containers.RepeatedCompositeFieldContainer[
+ google.protobuf.any_pb2.Any
+ ]:
+ """Reserved field for protocol extensions."""
+ def __init__(
+ self,
+ *,
+ define_output: global___PipelineCommand.DefineOutput | None = ...,
+ define_flows:
collections.abc.Iterable[global___PipelineCommand.DefineFlow]
+ | None = ...,
+ full_refresh: builtins.bool | None = ...,
+ storage: builtins.str | None = ...,
+ extension: collections.abc.Iterable[google.protobuf.any_pb2.Any] |
None = ...,
+ ) -> None: ...
+ def HasField(
+ self,
+ field_name: typing_extensions.Literal[
+ "_define_output",
+ b"_define_output",
+ "_full_refresh",
+ b"_full_refresh",
+ "_storage",
+ b"_storage",
+ "define_output",
+ b"define_output",
+ "full_refresh",
+ b"full_refresh",
+ "storage",
+ b"storage",
+ ],
+ ) -> builtins.bool: ...
+ def ClearField(
+ self,
+ field_name: typing_extensions.Literal[
+ "_define_output",
+ b"_define_output",
+ "_full_refresh",
+ b"_full_refresh",
+ "_storage",
+ b"_storage",
+ "define_flows",
+ b"define_flows",
+ "define_output",
+ b"define_output",
+ "extension",
+ b"extension",
+ "full_refresh",
+ b"full_refresh",
+ "storage",
+ b"storage",
+ ],
+ ) -> None: ...
+ @typing.overload
+ def WhichOneof(
+ self, oneof_group: typing_extensions.Literal["_define_output",
b"_define_output"]
+ ) -> typing_extensions.Literal["define_output"] | None: ...
+ @typing.overload
+ def WhichOneof(
+ self, oneof_group: typing_extensions.Literal["_full_refresh",
b"_full_refresh"]
+ ) -> typing_extensions.Literal["full_refresh"] | None: ...
+ @typing.overload
+ def WhichOneof(
+ self, oneof_group: typing_extensions.Literal["_storage",
b"_storage"]
+ ) -> typing_extensions.Literal["storage"] | None: ...
+
class StartRun(google.protobuf.message.Message):
"""Resolves all datasets and flows and start a pipeline update. Should
be called after all
graph elements are registered.
@@ -1051,6 +1143,7 @@ class PipelineCommand(google.protobuf.message.Message):
DEFINE_SQL_GRAPH_ELEMENTS_FIELD_NUMBER: builtins.int
GET_QUERY_FUNCTION_EXECUTION_SIGNAL_STREAM_FIELD_NUMBER: builtins.int
DEFINE_FLOW_QUERY_FUNCTION_RESULT_FIELD_NUMBER: builtins.int
+ EXECUTE_OUTPUT_FLOWS_FIELD_NUMBER: builtins.int
EXTENSION_FIELD_NUMBER: builtins.int
@property
def create_dataflow_graph(self) ->
global___PipelineCommand.CreateDataflowGraph: ...
@@ -1073,6 +1166,8 @@ class PipelineCommand(google.protobuf.message.Message):
self,
) -> global___PipelineCommand.DefineFlowQueryFunctionResult: ...
@property
+ def execute_output_flows(self) ->
global___PipelineCommand.ExecuteOutputFlows: ...
+ @property
def extension(self) -> google.protobuf.any_pb2.Any:
"""Reserved field for protocol extensions.
Used to support forward-compatibility by carrying additional command
types
@@ -1092,6 +1187,7 @@ class PipelineCommand(google.protobuf.message.Message):
| None = ...,
define_flow_query_function_result:
global___PipelineCommand.DefineFlowQueryFunctionResult
| None = ...,
+ execute_output_flows: global___PipelineCommand.ExecuteOutputFlows |
None = ...,
extension: google.protobuf.any_pb2.Any | None = ...,
) -> None: ...
def HasField(
@@ -1111,6 +1207,8 @@ class PipelineCommand(google.protobuf.message.Message):
b"define_sql_graph_elements",
"drop_dataflow_graph",
b"drop_dataflow_graph",
+ "execute_output_flows",
+ b"execute_output_flows",
"extension",
b"extension",
"get_query_function_execution_signal_stream",
@@ -1136,6 +1234,8 @@ class PipelineCommand(google.protobuf.message.Message):
b"define_sql_graph_elements",
"drop_dataflow_graph",
b"drop_dataflow_graph",
+ "execute_output_flows",
+ b"execute_output_flows",
"extension",
b"extension",
"get_query_function_execution_signal_stream",
@@ -1156,6 +1256,7 @@ class PipelineCommand(google.protobuf.message.Message):
"define_sql_graph_elements",
"get_query_function_execution_signal_stream",
"define_flow_query_function_result",
+ "execute_output_flows",
"extension",
]
| None
diff --git a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
index 0874c2d10ec5..a46c7a75bc9b 100644
--- a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
+++ b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
@@ -40,6 +40,8 @@ message PipelineCommand {
DefineSqlGraphElements define_sql_graph_elements = 6;
GetQueryFunctionExecutionSignalStream
get_query_function_execution_signal_stream = 7;
DefineFlowQueryFunctionResult define_flow_query_function_result = 8;
+ ExecuteOutputFlows execute_output_flows = 9;
+
// Reserved field for protocol extensions.
// Used to support forward-compatibility by carrying additional command
types
// that are not yet defined in this version of the proto. During planning,
the
@@ -165,6 +167,25 @@ message PipelineCommand {
}
}
+ // Request to execute all flows for a single output (dataset or sink)
remotely.
+ message ExecuteOutputFlows {
+
+ // The output (table or materialized view or sink) definition.
+ optional DefineOutput define_output = 1;
+
+ // The flows to execute for this table.
+ repeated DefineFlow define_flows = 2;
+
+ // Whether to perform a full refresh instead of an incremental update.
+ optional bool full_refresh = 3;
+
+ // Storage location for pipeline checkpoints and metadata.
+ optional string storage = 4;
+
+ // Reserved field for protocol extensions.
+ repeated google.protobuf.Any extension = 999;
+ }
+
// Resolves all datasets and flows and start a pipeline update. Should be
called after all
// graph elements are registered.
message StartRun {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]