This is an automated email from the ASF dual-hosted git repository.
sandy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 418cf56ed4dc [SPARK-53796][SDP] Add `extension` field to a few
pipeline protos to support forward compatibility
418cf56ed4dc is described below
commit 418cf56ed4dc440b36ebab37d2aac898278215a8
Author: Yuheng Chang <[email protected]>
AuthorDate: Sat Oct 11 08:05:30 2025 -0700
[SPARK-53796][SDP] Add `extension` field to a few pipeline protos to
support forward compatibility
### What changes were proposed in this pull request?
Adding `google.protobuf.Any extension = 999;` field to `PipelineCommand`
and `SourceCodeLocation` Protos to support forward-compatibility by carrying
additional pipeline command types, source code location fields that are not yet
defined in this version of the proto.
During the planning stage, the Spark Server will resolve and dispatch
command / message to the correct handler.
### Why are the changes needed?
To support forward-compatibility by carrying additional pipeline command
types / dataset or flow's fields that are not yet defined in this version of
the proto. Useful for platforms that want to extend pipeline commands with
platform-specific capabilities.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Test will be added for feature works using this new field
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #52514 from SCHJonathan/jonathan-chang_data/add-extension.
Authored-by: Yuheng Chang <[email protected]>
Signed-off-by: Sandy Ryza <[email protected]>
---
python/pyspark/sql/connect/proto/pipelines_pb2.py | 104 ++++++++++-----------
python/pyspark/sql/connect/proto/pipelines_pb2.pyi | 29 ++++++
.../main/protobuf/spark/connect/pipelines.proto | 11 +++
3 files changed, 92 insertions(+), 52 deletions(-)
diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.py
b/python/pyspark/sql/connect/proto/pipelines_pb2.py
index b2240e398ced..44a6bdbb2280 100644
--- a/python/pyspark/sql/connect/proto/pipelines_pb2.py
+++ b/python/pyspark/sql/connect/proto/pipelines_pb2.py
@@ -42,7 +42,7 @@ from pyspark.sql.connect.proto import types_pb2 as
spark_dot_connect_dot_types__
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-
b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1aspark/connect/common.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\xe5!\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01
\x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12R\n\rdefine_output\x18\x02
\x01(\x0b\x32+.spark.connect.PipelineCommand.DefineOutp [...]
+
b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1aspark/connect/common.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\x9c"\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01
\x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12R\n\rdefine_output\x18\x02
\x01(\x0b\x32+.spark.connect.PipelineCommand.DefineOutp [...]
)
_globals = globals()
@@ -69,60 +69,60 @@ if not _descriptor._USE_C_DESCRIPTORS:
]._serialized_options = b"8\001"
_globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._loaded_options = None
_globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_options =
b"8\001"
- _globals["_OUTPUTTYPE"]._serialized_start = 5665
- _globals["_OUTPUTTYPE"]._serialized_end = 5770
+ _globals["_OUTPUTTYPE"]._serialized_start = 5774
+ _globals["_OUTPUTTYPE"]._serialized_end = 5879
_globals["_PIPELINECOMMAND"]._serialized_start = 195
- _globals["_PIPELINECOMMAND"]._serialized_end = 4520
- _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_start = 1074
- _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_end = 1382
-
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_start
= 1283
-
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_end =
1341
- _globals["_PIPELINECOMMAND_DROPDATAFLOWGRAPH"]._serialized_start = 1384
- _globals["_PIPELINECOMMAND_DROPDATAFLOWGRAPH"]._serialized_end = 1474
- _globals["_PIPELINECOMMAND_DEFINEOUTPUT"]._serialized_start = 1477
- _globals["_PIPELINECOMMAND_DEFINEOUTPUT"]._serialized_end = 2728
- _globals["_PIPELINECOMMAND_DEFINEOUTPUT_TABLEDETAILS"]._serialized_start =
2013
- _globals["_PIPELINECOMMAND_DEFINEOUTPUT_TABLEDETAILS"]._serialized_end =
2414
+ _globals["_PIPELINECOMMAND"]._serialized_end = 4575
+ _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_start = 1129
+ _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_end = 1437
+
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_start
= 1338
+
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_end =
1396
+ _globals["_PIPELINECOMMAND_DROPDATAFLOWGRAPH"]._serialized_start = 1439
+ _globals["_PIPELINECOMMAND_DROPDATAFLOWGRAPH"]._serialized_end = 1529
+ _globals["_PIPELINECOMMAND_DEFINEOUTPUT"]._serialized_start = 1532
+ _globals["_PIPELINECOMMAND_DEFINEOUTPUT"]._serialized_end = 2783
+ _globals["_PIPELINECOMMAND_DEFINEOUTPUT_TABLEDETAILS"]._serialized_start =
2068
+ _globals["_PIPELINECOMMAND_DEFINEOUTPUT_TABLEDETAILS"]._serialized_end =
2469
_globals[
"_PIPELINECOMMAND_DEFINEOUTPUT_TABLEDETAILS_TABLEPROPERTIESENTRY"
- ]._serialized_start = 2327
+ ]._serialized_start = 2382
_globals[
"_PIPELINECOMMAND_DEFINEOUTPUT_TABLEDETAILS_TABLEPROPERTIESENTRY"
- ]._serialized_end = 2393
- _globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS"]._serialized_start =
2417
- _globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS"]._serialized_end =
2626
-
_globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS_OPTIONSENTRY"]._serialized_start
= 2557
-
_globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS_OPTIONSENTRY"]._serialized_end
= 2615
- _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_start = 2731
- _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_end = 3592
- _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_start =
1283
- _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_end = 1341
-
_globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_start
= 3325
-
_globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_end
= 3422
- _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_start = 3424
- _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_end = 3482
- _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_start = 3595
- _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_end = 3917
- _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_start =
3920
- _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_end = 4119
-
_globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_start
= 4122
-
_globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_end
= 4280
-
_globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_start =
4283
- _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_end
= 4504
- _globals["_PIPELINECOMMANDRESULT"]._serialized_start = 4523
- _globals["_PIPELINECOMMANDRESULT"]._serialized_end = 5275
-
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_start
= 4892
-
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_end =
4990
- _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_start =
4993
- _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_end =
5126
- _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_start =
5129
- _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_end = 5260
- _globals["_PIPELINEEVENTRESULT"]._serialized_start = 5277
- _globals["_PIPELINEEVENTRESULT"]._serialized_end = 5350
- _globals["_PIPELINEEVENT"]._serialized_start = 5352
- _globals["_PIPELINEEVENT"]._serialized_end = 5468
- _globals["_SOURCECODELOCATION"]._serialized_start = 5470
- _globals["_SOURCECODELOCATION"]._serialized_end = 5592
- _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_start = 5594
- _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_end = 5663
+ ]._serialized_end = 2448
+ _globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS"]._serialized_start =
2472
+ _globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS"]._serialized_end =
2681
+
_globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS_OPTIONSENTRY"]._serialized_start
= 2612
+
_globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS_OPTIONSENTRY"]._serialized_end
= 2670
+ _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_start = 2786
+ _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_end = 3647
+ _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_start =
1338
+ _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_end = 1396
+
_globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_start
= 3380
+
_globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_end
= 3477
+ _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_start = 3479
+ _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_end = 3537
+ _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_start = 3650
+ _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_end = 3972
+ _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_start =
3975
+ _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_end = 4174
+
_globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_start
= 4177
+
_globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_end
= 4335
+
_globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_start =
4338
+ _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_end
= 4559
+ _globals["_PIPELINECOMMANDRESULT"]._serialized_start = 4578
+ _globals["_PIPELINECOMMANDRESULT"]._serialized_end = 5330
+
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_start
= 4947
+
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_end =
5045
+ _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_start =
5048
+ _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_end =
5181
+ _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_start =
5184
+ _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_end = 5315
+ _globals["_PIPELINEEVENTRESULT"]._serialized_start = 5332
+ _globals["_PIPELINEEVENTRESULT"]._serialized_end = 5405
+ _globals["_PIPELINEEVENT"]._serialized_start = 5407
+ _globals["_PIPELINEEVENT"]._serialized_end = 5523
+ _globals["_SOURCECODELOCATION"]._serialized_start = 5526
+ _globals["_SOURCECODELOCATION"]._serialized_end = 5701
+ _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_start = 5703
+ _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_end = 5772
# @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi
b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi
index 4da07a68d200..a2714b86af94 100644
--- a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi
@@ -1021,6 +1021,7 @@ class PipelineCommand(google.protobuf.message.Message):
DEFINE_SQL_GRAPH_ELEMENTS_FIELD_NUMBER: builtins.int
GET_QUERY_FUNCTION_EXECUTION_SIGNAL_STREAM_FIELD_NUMBER: builtins.int
DEFINE_FLOW_QUERY_FUNCTION_RESULT_FIELD_NUMBER: builtins.int
+ EXTENSION_FIELD_NUMBER: builtins.int
@property
def create_dataflow_graph(self) ->
global___PipelineCommand.CreateDataflowGraph: ...
@property
@@ -1041,6 +1042,13 @@ class PipelineCommand(google.protobuf.message.Message):
def define_flow_query_function_result(
self,
) -> global___PipelineCommand.DefineFlowQueryFunctionResult: ...
+ @property
+ def extension(self) -> google.protobuf.any_pb2.Any:
+ """Reserved field for protocol extensions.
+ Used to support forward-compatibility by carrying additional command
types
+ that are not yet defined in this version of the proto. During
planning, the
+ engine will resolve and dispatch the concrete command contained in
this field.
+ """
def __init__(
self,
*,
@@ -1054,6 +1062,7 @@ class PipelineCommand(google.protobuf.message.Message):
| None = ...,
define_flow_query_function_result:
global___PipelineCommand.DefineFlowQueryFunctionResult
| None = ...,
+ extension: google.protobuf.any_pb2.Any | None = ...,
) -> None: ...
def HasField(
self,
@@ -1072,6 +1081,8 @@ class PipelineCommand(google.protobuf.message.Message):
b"define_sql_graph_elements",
"drop_dataflow_graph",
b"drop_dataflow_graph",
+ "extension",
+ b"extension",
"get_query_function_execution_signal_stream",
b"get_query_function_execution_signal_stream",
"start_run",
@@ -1095,6 +1106,8 @@ class PipelineCommand(google.protobuf.message.Message):
b"define_sql_graph_elements",
"drop_dataflow_graph",
b"drop_dataflow_graph",
+ "extension",
+ b"extension",
"get_query_function_execution_signal_stream",
b"get_query_function_execution_signal_stream",
"start_run",
@@ -1113,6 +1126,7 @@ class PipelineCommand(google.protobuf.message.Message):
"define_sql_graph_elements",
"get_query_function_execution_signal_stream",
"define_flow_query_function_result",
+ "extension",
]
| None
): ...
@@ -1347,15 +1361,28 @@ class
SourceCodeLocation(google.protobuf.message.Message):
FILE_NAME_FIELD_NUMBER: builtins.int
LINE_NUMBER_FIELD_NUMBER: builtins.int
+ EXTENSION_FIELD_NUMBER: builtins.int
file_name: builtins.str
"""The file that this pipeline source code was defined in."""
line_number: builtins.int
"""The specific line number that this pipeline source code is located at,
if applicable."""
+ @property
+ def extension(
+ self,
+ ) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[
+ google.protobuf.any_pb2.Any
+ ]:
+ """Reserved field for protocol extensions.
+ Used to support forward-compatibility by carrying additional fields
+ that are not yet defined in this version of the proto. During
planning, the
+ engine will resolve and dispatch the concrete command contained in
this field.
+ """
def __init__(
self,
*,
file_name: builtins.str | None = ...,
line_number: builtins.int | None = ...,
+ extension: collections.abc.Iterable[google.protobuf.any_pb2.Any] |
None = ...,
) -> None: ...
def HasField(
self,
@@ -1377,6 +1404,8 @@ class SourceCodeLocation(google.protobuf.message.Message):
b"_file_name",
"_line_number",
b"_line_number",
+ "extension",
+ b"extension",
"file_name",
b"file_name",
"line_number",
diff --git a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
index 3cb555db497f..41939ec2548f 100644
--- a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
+++ b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
@@ -40,6 +40,11 @@ message PipelineCommand {
DefineSqlGraphElements define_sql_graph_elements = 6;
GetQueryFunctionExecutionSignalStream
get_query_function_execution_signal_stream = 7;
DefineFlowQueryFunctionResult define_flow_query_function_result = 8;
+ // Reserved field for protocol extensions.
+ // Used to support forward-compatibility by carrying additional command
types
+ // that are not yet defined in this version of the proto. During planning,
the
+ // engine will resolve and dispatch the concrete command contained in this
field.
+ google.protobuf.Any extension = 999;
}
// Request to create a new dataflow graph.
@@ -262,6 +267,12 @@ message SourceCodeLocation {
optional string file_name = 1;
// The specific line number that this pipeline source code is located at, if
applicable.
optional int32 line_number = 2;
+
+ // Reserved field for protocol extensions.
+ // Used to support forward-compatibility by carrying additional fields
+ // that are not yet defined in this version of the proto. During planning,
the
+ // engine will resolve and dispatch the concrete command contained in this
field.
+ repeated google.protobuf.Any extension = 999;
}
// A signal from the server to the client to execute the query function for
one or more flows, and
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]