This is an automated email from the ASF dual-hosted git repository.
sandy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new bff9dcfbfb81 [SPARK-55945][SDP] Support structured identifiers for
flows in SDP eager analysis protos
bff9dcfbfb81 is described below
commit bff9dcfbfb812211377fd30d50a3a84a85b85f9f
Author: Yuheng Chang <[email protected]>
AuthorDate: Fri Mar 13 14:15:54 2026 -0700
[SPARK-55945][SDP] Support structured identifiers for flows in SDP eager
analysis protos
### What changes were proposed in this pull request?
For eager analysis RPCs introduced in the previous PR
(https://github.com/apache/spark/pull/52154), this PR make an improvement to
use structured `ResolvedIdentifier` instead of string flow names --
`PipelineQueryFunctionExecutionSignal` and `DefineFlowQueryFunctionResult` now
identify flows via `ResolvedIdentifier` (catalog + namespace + table name)
rather than opaque string names. This provides consistent, unambiguous
identification and simplifies parsing on both client and server.
### Why are the changes needed?
Proto improvement that simplify the codebase.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Proto only change
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #54740 from SCHJonathan/jonathan-chang_data/eager-analysis-proto.
Authored-by: Yuheng Chang <[email protected]>
Signed-off-by: Sandy Ryza <[email protected]>
---
python/pyspark/sql/connect/proto/pipelines_pb2.py | 62 ++++++++++++--------
python/pyspark/sql/connect/proto/pipelines_pb2.pyi | 67 ++++++++++++++++++++--
.../main/protobuf/spark/connect/pipelines.proto | 25 ++++++--
3 files changed, 122 insertions(+), 32 deletions(-)
diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.py
b/python/pyspark/sql/connect/proto/pipelines_pb2.py
index ebdefc10ec05..31d3996891b9 100644
--- a/python/pyspark/sql/connect/proto/pipelines_pb2.py
+++ b/python/pyspark/sql/connect/proto/pipelines_pb2.py
@@ -42,7 +42,7 @@ from pyspark.sql.connect.proto import types_pb2 as
spark_dot_connect_dot_types__
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-
b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1aspark/connect/common.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\xbb&\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01
\x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12R\n\rdefine_output\x18\x02
\x01(\x0b\x32+.spark.connect.PipelineCommand.DefineOutp [...]
+
b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1aspark/connect/common.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\xa4\'\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01
\x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12R\n\rdefine_output\x18\x02
\x01(\x0b\x32+.spark.connect.PipelineCommand.DefineOut [...]
)
_globals = globals()
@@ -69,10 +69,26 @@ if not _descriptor._USE_C_DESCRIPTORS:
]._serialized_options = b"8\001"
_globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._loaded_options = None
_globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_options =
b"8\001"
- _globals["_OUTPUTTYPE"]._serialized_start = 6649
- _globals["_OUTPUTTYPE"]._serialized_end = 6754
+ _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"].fields_by_name[
+ "flow_name"
+ ]._loaded_options = None
+ _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"].fields_by_name[
+ "flow_name"
+ ]._serialized_options = b"\030\001"
+ _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"].fields_by_name[
+ "flow_names"
+ ]._loaded_options = None
+ _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"].fields_by_name[
+ "flow_names"
+ ]._serialized_options = b"\030\001"
+
_globals["_PIPELINEANALYSISCONTEXT"].fields_by_name["flow_name"]._loaded_options
= None
+ _globals["_PIPELINEANALYSISCONTEXT"].fields_by_name[
+ "flow_name"
+ ]._serialized_options = b"\030\001"
+ _globals["_OUTPUTTYPE"]._serialized_start = 6942
+ _globals["_OUTPUTTYPE"]._serialized_end = 7047
_globals["_PIPELINECOMMAND"]._serialized_start = 195
- _globals["_PIPELINECOMMAND"]._serialized_end = 5118
+ _globals["_PIPELINECOMMAND"]._serialized_end = 5223
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_start = 1232
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_end = 1540
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_start
= 1441
@@ -110,23 +126,23 @@ if not _descriptor._USE_C_DESCRIPTORS:
_globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_start
= 4720
_globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_end
= 4878
_globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_start =
4881
- _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_end
= 5102
- _globals["_PIPELINECOMMANDRESULT"]._serialized_start = 5121
- _globals["_PIPELINECOMMANDRESULT"]._serialized_end = 5873
-
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_start
= 5490
-
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_end =
5588
- _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_start =
5591
- _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_end =
5724
- _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_start =
5727
- _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_end = 5858
- _globals["_PIPELINEEVENTRESULT"]._serialized_start = 5875
- _globals["_PIPELINEEVENTRESULT"]._serialized_end = 5948
- _globals["_PIPELINEEVENT"]._serialized_start = 5950
- _globals["_PIPELINEEVENT"]._serialized_end = 6066
- _globals["_SOURCECODELOCATION"]._serialized_start = 6069
- _globals["_SOURCECODELOCATION"]._serialized_end = 6310
- _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_start = 6312
- _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_end = 6381
- _globals["_PIPELINEANALYSISCONTEXT"]._serialized_start = 6384
- _globals["_PIPELINEANALYSISCONTEXT"]._serialized_end = 6647
+ _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_end
= 5207
+ _globals["_PIPELINECOMMANDRESULT"]._serialized_start = 5226
+ _globals["_PIPELINECOMMANDRESULT"]._serialized_end = 5978
+
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_start
= 5595
+
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_end =
5693
+ _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_start =
5696
+ _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_end =
5829
+ _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_start =
5832
+ _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_end = 5963
+ _globals["_PIPELINEEVENTRESULT"]._serialized_start = 5980
+ _globals["_PIPELINEEVENTRESULT"]._serialized_end = 6053
+ _globals["_PIPELINEEVENT"]._serialized_start = 6055
+ _globals["_PIPELINEEVENT"]._serialized_end = 6171
+ _globals["_SOURCECODELOCATION"]._serialized_start = 6174
+ _globals["_SOURCECODELOCATION"]._serialized_end = 6415
+ _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_start = 6418
+ _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_end = 6569
+ _globals["_PIPELINEANALYSISCONTEXT"]._serialized_start = 6572
+ _globals["_PIPELINEANALYSISCONTEXT"]._serialized_end = 6940
# @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi
b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi
index 391746c04993..19ca1914e11e 100644
--- a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi
@@ -1071,10 +1071,17 @@ class PipelineCommand(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
FLOW_NAME_FIELD_NUMBER: builtins.int
+ FLOW_IDENTIFIER_FIELD_NUMBER: builtins.int
DATAFLOW_GRAPH_ID_FIELD_NUMBER: builtins.int
RELATION_FIELD_NUMBER: builtins.int
flow_name: builtins.str
- """The fully qualified name of the flow being updated."""
+ """(Deprecated) The fully qualified name of the flow being updated.
+
+ This field is deprecated since Spark 4.2+. Use flow_identifier field
instead.
+ """
+ @property
+ def flow_identifier(self) ->
pyspark.sql.connect.proto.common_pb2.ResolvedIdentifier:
+ """The fully qualified identifier of the flow being updated."""
dataflow_graph_id: builtins.str
"""The ID of the graph this flow belongs to."""
@property
@@ -1084,6 +1091,7 @@ class PipelineCommand(google.protobuf.message.Message):
self,
*,
flow_name: builtins.str | None = ...,
+ flow_identifier:
pyspark.sql.connect.proto.common_pb2.ResolvedIdentifier | None = ...,
dataflow_graph_id: builtins.str | None = ...,
relation: pyspark.sql.connect.proto.relations_pb2.Relation | None
= ...,
) -> None: ...
@@ -1092,12 +1100,16 @@ class PipelineCommand(google.protobuf.message.Message):
field_name: typing_extensions.Literal[
"_dataflow_graph_id",
b"_dataflow_graph_id",
+ "_flow_identifier",
+ b"_flow_identifier",
"_flow_name",
b"_flow_name",
"_relation",
b"_relation",
"dataflow_graph_id",
b"dataflow_graph_id",
+ "flow_identifier",
+ b"flow_identifier",
"flow_name",
b"flow_name",
"relation",
@@ -1109,12 +1121,16 @@ class PipelineCommand(google.protobuf.message.Message):
field_name: typing_extensions.Literal[
"_dataflow_graph_id",
b"_dataflow_graph_id",
+ "_flow_identifier",
+ b"_flow_identifier",
"_flow_name",
b"_flow_name",
"_relation",
b"_relation",
"dataflow_graph_id",
b"dataflow_graph_id",
+ "flow_identifier",
+ b"flow_identifier",
"flow_name",
b"flow_name",
"relation",
@@ -1127,6 +1143,10 @@ class PipelineCommand(google.protobuf.message.Message):
oneof_group: typing_extensions.Literal["_dataflow_graph_id",
b"_dataflow_graph_id"],
) -> typing_extensions.Literal["dataflow_graph_id"] | None: ...
@typing.overload
+ def WhichOneof(
+ self, oneof_group: typing_extensions.Literal["_flow_identifier",
b"_flow_identifier"]
+ ) -> typing_extensions.Literal["flow_identifier"] | None: ...
+ @typing.overload
def WhichOneof(
self, oneof_group: typing_extensions.Literal["_flow_name",
b"_flow_name"]
) -> typing_extensions.Literal["flow_name"] | None: ...
@@ -1578,17 +1598,36 @@ class
PipelineQueryFunctionExecutionSignal(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
FLOW_NAMES_FIELD_NUMBER: builtins.int
+ FLOW_IDENTIFIERS_FIELD_NUMBER: builtins.int
@property
def flow_names(
self,
- ) ->
google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]:
...
+ ) ->
google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]:
+ """(Deprecated) The name of flows that are ready to be re-evaluated.
+
+ This field is deprecated since Spark 4.2+. Use flow_identifiers field
instead.
+ """
+ @property
+ def flow_identifiers(
+ self,
+ ) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[
+ pyspark.sql.connect.proto.common_pb2.ResolvedIdentifier
+ ]:
+ """The identifier of flows that are ready to be re-evaluated"""
def __init__(
self,
*,
flow_names: collections.abc.Iterable[builtins.str] | None = ...,
+ flow_identifiers: collections.abc.Iterable[
+ pyspark.sql.connect.proto.common_pb2.ResolvedIdentifier
+ ]
+ | None = ...,
) -> None: ...
def ClearField(
- self, field_name: typing_extensions.Literal["flow_names",
b"flow_names"]
+ self,
+ field_name: typing_extensions.Literal[
+ "flow_identifiers", b"flow_identifiers", "flow_names",
b"flow_names"
+ ],
) -> None: ...
global___PipelineQueryFunctionExecutionSignal =
PipelineQueryFunctionExecutionSignal
@@ -1601,13 +1640,20 @@ class
PipelineAnalysisContext(google.protobuf.message.Message):
DATAFLOW_GRAPH_ID_FIELD_NUMBER: builtins.int
DEFINITION_PATH_FIELD_NUMBER: builtins.int
FLOW_NAME_FIELD_NUMBER: builtins.int
+ FLOW_IDENTIFIER_FIELD_NUMBER: builtins.int
EXTENSION_FIELD_NUMBER: builtins.int
dataflow_graph_id: builtins.str
"""Unique identifier of the dataflow graph associated with this
pipeline."""
definition_path: builtins.str
"""The path of the top-level pipeline file determined at runtime during
pipeline initialization."""
flow_name: builtins.str
- """The name of the Flow involved in this analysis"""
+ """(Deprecated) The name of the Flow involved in this analysis
+
+ This field is deprecated since Spark 4.2+. Use flow_identifier field
instead.
+ """
+ @property
+ def flow_identifier(self) ->
pyspark.sql.connect.proto.common_pb2.ResolvedIdentifier:
+ """The identifier of the Flow involved in this analysis"""
@property
def extension(
self,
@@ -1621,6 +1667,7 @@ class
PipelineAnalysisContext(google.protobuf.message.Message):
dataflow_graph_id: builtins.str | None = ...,
definition_path: builtins.str | None = ...,
flow_name: builtins.str | None = ...,
+ flow_identifier:
pyspark.sql.connect.proto.common_pb2.ResolvedIdentifier | None = ...,
extension: collections.abc.Iterable[google.protobuf.any_pb2.Any] |
None = ...,
) -> None: ...
def HasField(
@@ -1630,12 +1677,16 @@ class
PipelineAnalysisContext(google.protobuf.message.Message):
b"_dataflow_graph_id",
"_definition_path",
b"_definition_path",
+ "_flow_identifier",
+ b"_flow_identifier",
"_flow_name",
b"_flow_name",
"dataflow_graph_id",
b"dataflow_graph_id",
"definition_path",
b"definition_path",
+ "flow_identifier",
+ b"flow_identifier",
"flow_name",
b"flow_name",
],
@@ -1647,6 +1698,8 @@ class
PipelineAnalysisContext(google.protobuf.message.Message):
b"_dataflow_graph_id",
"_definition_path",
b"_definition_path",
+ "_flow_identifier",
+ b"_flow_identifier",
"_flow_name",
b"_flow_name",
"dataflow_graph_id",
@@ -1655,6 +1708,8 @@ class
PipelineAnalysisContext(google.protobuf.message.Message):
b"definition_path",
"extension",
b"extension",
+ "flow_identifier",
+ b"flow_identifier",
"flow_name",
b"flow_name",
],
@@ -1668,6 +1723,10 @@ class
PipelineAnalysisContext(google.protobuf.message.Message):
self, oneof_group: typing_extensions.Literal["_definition_path",
b"_definition_path"]
) -> typing_extensions.Literal["definition_path"] | None: ...
@typing.overload
+ def WhichOneof(
+ self, oneof_group: typing_extensions.Literal["_flow_identifier",
b"_flow_identifier"]
+ ) -> typing_extensions.Literal["flow_identifier"] | None: ...
+ @typing.overload
def WhichOneof(
self, oneof_group: typing_extensions.Literal["_flow_name",
b"_flow_name"]
) -> typing_extensions.Literal["flow_name"] | None: ...
diff --git a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
index a46c7a75bc9b..6438583c9d47 100644
--- a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
+++ b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
@@ -234,8 +234,13 @@ message PipelineCommand {
// Request from the client to update the flow function evaluation result
// for a previously un-analyzed flow.
message DefineFlowQueryFunctionResult {
- // The fully qualified name of the flow being updated.
- optional string flow_name = 1;
+ // (Deprecated) The fully qualified name of the flow being updated.
+ //
+ // This field is deprecated since Spark 4.2+. Use flow_identifier field
instead.
+ optional string flow_name = 1 [deprecated = true];
+
+ // The fully qualified identifier of the flow being updated.
+ optional ResolvedIdentifier flow_identifier = 4;
// The ID of the graph this flow belongs to.
optional string dataflow_graph_id = 2;
@@ -311,7 +316,13 @@ message SourceCodeLocation {
// A signal from the server to the client to execute the query function for
one or more flows, and
// to register their results with the server.
message PipelineQueryFunctionExecutionSignal {
- repeated string flow_names = 1;
+ // (Deprecated) The name of flows that are ready to be re-evaluated.
+ //
+ // This field is deprecated since Spark 4.2+. Use flow_identifiers field
instead.
+ repeated string flow_names = 1 [deprecated = true];
+
+ // The identifier of flows that are ready to be re-evaluated
+ repeated ResolvedIdentifier flow_identifiers = 2;
}
// Metadata providing context about the pipeline during Spark Connect query
analysis.
@@ -320,8 +331,12 @@ message PipelineAnalysisContext {
optional string dataflow_graph_id = 1;
// The path of the top-level pipeline file determined at runtime during
pipeline initialization.
optional string definition_path = 2;
- // The name of the Flow involved in this analysis
- optional string flow_name = 3;
+ // (Deprecated) The name of the Flow involved in this analysis
+ //
+ // This field is deprecated since Spark 4.2+. Use flow_identifier field
instead.
+ optional string flow_name = 3 [deprecated = true];
+ // The identifier of the Flow involved in this analysis
+ optional ResolvedIdentifier flow_identifier = 4;
// Reserved field for protocol extensions.
repeated google.protobuf.Any extension = 999;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]