This is an automated email from the ASF dual-hosted git repository.

sandy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bff9dcfbfb81 [SPARK-55945][SDP] Support structured identifiers for 
flows in SDP eager analysis protos
bff9dcfbfb81 is described below

commit bff9dcfbfb812211377fd30d50a3a84a85b85f9f
Author: Yuheng Chang <[email protected]>
AuthorDate: Fri Mar 13 14:15:54 2026 -0700

    [SPARK-55945][SDP] Support structured identifiers for flows in SDP eager 
analysis protos
    
    ### What changes were proposed in this pull request?
    
    For eager analysis RPCs introduced in the previous PR 
(https://github.com/apache/spark/pull/52154), this PR make an improvement to 
use structured `ResolvedIdentifier` instead of string flow names -- 
`PipelineQueryFunctionExecutionSignal` and `DefineFlowQueryFunctionResult` now 
identify flows via `ResolvedIdentifier` (catalog + namespace + table name) 
rather than opaque string names. This provides consistent, unambiguous 
identification and simplifies parsing on both client and server.
    
    ### Why are the changes needed?
    
    Proto improvement that simplify the codebase.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Proto only change
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #54740 from SCHJonathan/jonathan-chang_data/eager-analysis-proto.
    
    Authored-by: Yuheng Chang <[email protected]>
    Signed-off-by: Sandy Ryza <[email protected]>
---
 python/pyspark/sql/connect/proto/pipelines_pb2.py  | 62 ++++++++++++--------
 python/pyspark/sql/connect/proto/pipelines_pb2.pyi | 67 ++++++++++++++++++++--
 .../main/protobuf/spark/connect/pipelines.proto    | 25 ++++++--
 3 files changed, 122 insertions(+), 32 deletions(-)

diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.py 
b/python/pyspark/sql/connect/proto/pipelines_pb2.py
index ebdefc10ec05..31d3996891b9 100644
--- a/python/pyspark/sql/connect/proto/pipelines_pb2.py
+++ b/python/pyspark/sql/connect/proto/pipelines_pb2.py
@@ -42,7 +42,7 @@ from pyspark.sql.connect.proto import types_pb2 as 
spark_dot_connect_dot_types__
 
 
 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-    
b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1aspark/connect/common.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\xbb&\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01
 
\x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12R\n\rdefine_output\x18\x02
 \x01(\x0b\x32+.spark.connect.PipelineCommand.DefineOutp [...]
+    
b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1aspark/connect/common.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\xa4\'\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01
 
\x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12R\n\rdefine_output\x18\x02
 \x01(\x0b\x32+.spark.connect.PipelineCommand.DefineOut [...]
 )
 
 _globals = globals()
@@ -69,10 +69,26 @@ if not _descriptor._USE_C_DESCRIPTORS:
     ]._serialized_options = b"8\001"
     _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._loaded_options = None
     _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_options = 
b"8\001"
-    _globals["_OUTPUTTYPE"]._serialized_start = 6649
-    _globals["_OUTPUTTYPE"]._serialized_end = 6754
+    _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"].fields_by_name[
+        "flow_name"
+    ]._loaded_options = None
+    _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"].fields_by_name[
+        "flow_name"
+    ]._serialized_options = b"\030\001"
+    _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"].fields_by_name[
+        "flow_names"
+    ]._loaded_options = None
+    _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"].fields_by_name[
+        "flow_names"
+    ]._serialized_options = b"\030\001"
+    
_globals["_PIPELINEANALYSISCONTEXT"].fields_by_name["flow_name"]._loaded_options
 = None
+    _globals["_PIPELINEANALYSISCONTEXT"].fields_by_name[
+        "flow_name"
+    ]._serialized_options = b"\030\001"
+    _globals["_OUTPUTTYPE"]._serialized_start = 6942
+    _globals["_OUTPUTTYPE"]._serialized_end = 7047
     _globals["_PIPELINECOMMAND"]._serialized_start = 195
-    _globals["_PIPELINECOMMAND"]._serialized_end = 5118
+    _globals["_PIPELINECOMMAND"]._serialized_end = 5223
     _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_start = 1232
     _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_end = 1540
     
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_start 
= 1441
@@ -110,23 +126,23 @@ if not _descriptor._USE_C_DESCRIPTORS:
     
_globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_start
 = 4720
     
_globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_end
 = 4878
     
_globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_start = 
4881
-    _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_end 
= 5102
-    _globals["_PIPELINECOMMANDRESULT"]._serialized_start = 5121
-    _globals["_PIPELINECOMMANDRESULT"]._serialized_end = 5873
-    
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_start 
= 5490
-    
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_end = 
5588
-    _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_start = 
5591
-    _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_end = 
5724
-    _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_start = 
5727
-    _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_end = 5858
-    _globals["_PIPELINEEVENTRESULT"]._serialized_start = 5875
-    _globals["_PIPELINEEVENTRESULT"]._serialized_end = 5948
-    _globals["_PIPELINEEVENT"]._serialized_start = 5950
-    _globals["_PIPELINEEVENT"]._serialized_end = 6066
-    _globals["_SOURCECODELOCATION"]._serialized_start = 6069
-    _globals["_SOURCECODELOCATION"]._serialized_end = 6310
-    _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_start = 6312
-    _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_end = 6381
-    _globals["_PIPELINEANALYSISCONTEXT"]._serialized_start = 6384
-    _globals["_PIPELINEANALYSISCONTEXT"]._serialized_end = 6647
+    _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_end 
= 5207
+    _globals["_PIPELINECOMMANDRESULT"]._serialized_start = 5226
+    _globals["_PIPELINECOMMANDRESULT"]._serialized_end = 5978
+    
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_start 
= 5595
+    
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_end = 
5693
+    _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_start = 
5696
+    _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_end = 
5829
+    _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_start = 
5832
+    _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_end = 5963
+    _globals["_PIPELINEEVENTRESULT"]._serialized_start = 5980
+    _globals["_PIPELINEEVENTRESULT"]._serialized_end = 6053
+    _globals["_PIPELINEEVENT"]._serialized_start = 6055
+    _globals["_PIPELINEEVENT"]._serialized_end = 6171
+    _globals["_SOURCECODELOCATION"]._serialized_start = 6174
+    _globals["_SOURCECODELOCATION"]._serialized_end = 6415
+    _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_start = 6418
+    _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_end = 6569
+    _globals["_PIPELINEANALYSISCONTEXT"]._serialized_start = 6572
+    _globals["_PIPELINEANALYSISCONTEXT"]._serialized_end = 6940
 # @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi 
b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi
index 391746c04993..19ca1914e11e 100644
--- a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi
@@ -1071,10 +1071,17 @@ class PipelineCommand(google.protobuf.message.Message):
         DESCRIPTOR: google.protobuf.descriptor.Descriptor
 
         FLOW_NAME_FIELD_NUMBER: builtins.int
+        FLOW_IDENTIFIER_FIELD_NUMBER: builtins.int
         DATAFLOW_GRAPH_ID_FIELD_NUMBER: builtins.int
         RELATION_FIELD_NUMBER: builtins.int
         flow_name: builtins.str
-        """The fully qualified name of the flow being updated."""
+        """(Deprecated) The fully qualified name of the flow being updated.
+
+        This field is deprecated since Spark 4.2+. Use flow_identifier field 
instead.
+        """
+        @property
+        def flow_identifier(self) -> 
pyspark.sql.connect.proto.common_pb2.ResolvedIdentifier:
+            """The fully qualified identifier of the flow being updated."""
         dataflow_graph_id: builtins.str
         """The ID of the graph this flow belongs to."""
         @property
@@ -1084,6 +1091,7 @@ class PipelineCommand(google.protobuf.message.Message):
             self,
             *,
             flow_name: builtins.str | None = ...,
+            flow_identifier: 
pyspark.sql.connect.proto.common_pb2.ResolvedIdentifier | None = ...,
             dataflow_graph_id: builtins.str | None = ...,
             relation: pyspark.sql.connect.proto.relations_pb2.Relation | None 
= ...,
         ) -> None: ...
@@ -1092,12 +1100,16 @@ class PipelineCommand(google.protobuf.message.Message):
             field_name: typing_extensions.Literal[
                 "_dataflow_graph_id",
                 b"_dataflow_graph_id",
+                "_flow_identifier",
+                b"_flow_identifier",
                 "_flow_name",
                 b"_flow_name",
                 "_relation",
                 b"_relation",
                 "dataflow_graph_id",
                 b"dataflow_graph_id",
+                "flow_identifier",
+                b"flow_identifier",
                 "flow_name",
                 b"flow_name",
                 "relation",
@@ -1109,12 +1121,16 @@ class PipelineCommand(google.protobuf.message.Message):
             field_name: typing_extensions.Literal[
                 "_dataflow_graph_id",
                 b"_dataflow_graph_id",
+                "_flow_identifier",
+                b"_flow_identifier",
                 "_flow_name",
                 b"_flow_name",
                 "_relation",
                 b"_relation",
                 "dataflow_graph_id",
                 b"dataflow_graph_id",
+                "flow_identifier",
+                b"flow_identifier",
                 "flow_name",
                 b"flow_name",
                 "relation",
@@ -1127,6 +1143,10 @@ class PipelineCommand(google.protobuf.message.Message):
             oneof_group: typing_extensions.Literal["_dataflow_graph_id", 
b"_dataflow_graph_id"],
         ) -> typing_extensions.Literal["dataflow_graph_id"] | None: ...
         @typing.overload
+        def WhichOneof(
+            self, oneof_group: typing_extensions.Literal["_flow_identifier", 
b"_flow_identifier"]
+        ) -> typing_extensions.Literal["flow_identifier"] | None: ...
+        @typing.overload
         def WhichOneof(
             self, oneof_group: typing_extensions.Literal["_flow_name", 
b"_flow_name"]
         ) -> typing_extensions.Literal["flow_name"] | None: ...
@@ -1578,17 +1598,36 @@ class 
PipelineQueryFunctionExecutionSignal(google.protobuf.message.Message):
     DESCRIPTOR: google.protobuf.descriptor.Descriptor
 
     FLOW_NAMES_FIELD_NUMBER: builtins.int
+    FLOW_IDENTIFIERS_FIELD_NUMBER: builtins.int
     @property
     def flow_names(
         self,
-    ) -> 
google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: 
...
+    ) -> 
google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]:
+        """(Deprecated) The name of flows that are ready to be re-evaluated.
+
+        This field is deprecated since Spark 4.2+. Use flow_identifiers field 
instead.
+        """
+    @property
+    def flow_identifiers(
+        self,
+    ) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[
+        pyspark.sql.connect.proto.common_pb2.ResolvedIdentifier
+    ]:
+        """The identifier of flows that are ready to be re-evaluated"""
     def __init__(
         self,
         *,
         flow_names: collections.abc.Iterable[builtins.str] | None = ...,
+        flow_identifiers: collections.abc.Iterable[
+            pyspark.sql.connect.proto.common_pb2.ResolvedIdentifier
+        ]
+        | None = ...,
     ) -> None: ...
     def ClearField(
-        self, field_name: typing_extensions.Literal["flow_names", 
b"flow_names"]
+        self,
+        field_name: typing_extensions.Literal[
+            "flow_identifiers", b"flow_identifiers", "flow_names", 
b"flow_names"
+        ],
     ) -> None: ...
 
 global___PipelineQueryFunctionExecutionSignal = 
PipelineQueryFunctionExecutionSignal
@@ -1601,13 +1640,20 @@ class 
PipelineAnalysisContext(google.protobuf.message.Message):
     DATAFLOW_GRAPH_ID_FIELD_NUMBER: builtins.int
     DEFINITION_PATH_FIELD_NUMBER: builtins.int
     FLOW_NAME_FIELD_NUMBER: builtins.int
+    FLOW_IDENTIFIER_FIELD_NUMBER: builtins.int
     EXTENSION_FIELD_NUMBER: builtins.int
     dataflow_graph_id: builtins.str
     """Unique identifier of the dataflow graph associated with this 
pipeline."""
     definition_path: builtins.str
     """The path of the top-level pipeline file determined at runtime during 
pipeline initialization."""
     flow_name: builtins.str
-    """The name of the Flow involved in this analysis"""
+    """(Deprecated) The name of the Flow involved in this analysis
+
+    This field is deprecated since Spark 4.2+. Use flow_identifier field 
instead.
+    """
+    @property
+    def flow_identifier(self) -> 
pyspark.sql.connect.proto.common_pb2.ResolvedIdentifier:
+        """The identifier of the Flow involved in this analysis"""
     @property
     def extension(
         self,
@@ -1621,6 +1667,7 @@ class 
PipelineAnalysisContext(google.protobuf.message.Message):
         dataflow_graph_id: builtins.str | None = ...,
         definition_path: builtins.str | None = ...,
         flow_name: builtins.str | None = ...,
+        flow_identifier: 
pyspark.sql.connect.proto.common_pb2.ResolvedIdentifier | None = ...,
         extension: collections.abc.Iterable[google.protobuf.any_pb2.Any] | 
None = ...,
     ) -> None: ...
     def HasField(
@@ -1630,12 +1677,16 @@ class 
PipelineAnalysisContext(google.protobuf.message.Message):
             b"_dataflow_graph_id",
             "_definition_path",
             b"_definition_path",
+            "_flow_identifier",
+            b"_flow_identifier",
             "_flow_name",
             b"_flow_name",
             "dataflow_graph_id",
             b"dataflow_graph_id",
             "definition_path",
             b"definition_path",
+            "flow_identifier",
+            b"flow_identifier",
             "flow_name",
             b"flow_name",
         ],
@@ -1647,6 +1698,8 @@ class 
PipelineAnalysisContext(google.protobuf.message.Message):
             b"_dataflow_graph_id",
             "_definition_path",
             b"_definition_path",
+            "_flow_identifier",
+            b"_flow_identifier",
             "_flow_name",
             b"_flow_name",
             "dataflow_graph_id",
@@ -1655,6 +1708,8 @@ class 
PipelineAnalysisContext(google.protobuf.message.Message):
             b"definition_path",
             "extension",
             b"extension",
+            "flow_identifier",
+            b"flow_identifier",
             "flow_name",
             b"flow_name",
         ],
@@ -1668,6 +1723,10 @@ class 
PipelineAnalysisContext(google.protobuf.message.Message):
         self, oneof_group: typing_extensions.Literal["_definition_path", 
b"_definition_path"]
     ) -> typing_extensions.Literal["definition_path"] | None: ...
     @typing.overload
+    def WhichOneof(
+        self, oneof_group: typing_extensions.Literal["_flow_identifier", 
b"_flow_identifier"]
+    ) -> typing_extensions.Literal["flow_identifier"] | None: ...
+    @typing.overload
     def WhichOneof(
         self, oneof_group: typing_extensions.Literal["_flow_name", 
b"_flow_name"]
     ) -> typing_extensions.Literal["flow_name"] | None: ...
diff --git a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto 
b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
index a46c7a75bc9b..6438583c9d47 100644
--- a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
+++ b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
@@ -234,8 +234,13 @@ message PipelineCommand {
   // Request from the client to update the flow function evaluation result
   // for a previously un-analyzed flow.
   message DefineFlowQueryFunctionResult {
-    // The fully qualified name of the flow being updated.
-    optional string flow_name = 1;
+    // (Deprecated) The fully qualified name of the flow being updated.
+    //
+    // This field is deprecated since Spark 4.2+. Use flow_identifier field 
instead.
+    optional string flow_name = 1 [deprecated = true];
+
+    // The fully qualified identifier of the flow being updated.
+    optional ResolvedIdentifier flow_identifier = 4;
 
     // The ID of the graph this flow belongs to.
     optional string dataflow_graph_id = 2;
@@ -311,7 +316,13 @@ message SourceCodeLocation {
 // A signal from the server to the client to execute the query function for 
one or more flows, and
 // to register their results with the server.
 message PipelineQueryFunctionExecutionSignal {
-  repeated string flow_names = 1;
+  // (Deprecated) The name of flows that are ready to be re-evaluated.
+  //
+  // This field is deprecated since Spark 4.2+. Use flow_identifiers field 
instead.
+  repeated string flow_names = 1 [deprecated = true];
+
+  // The identifier of flows that are ready to be re-evaluated
+  repeated ResolvedIdentifier flow_identifiers = 2;
 }
 
 // Metadata providing context about the pipeline during Spark Connect query 
analysis.
@@ -320,8 +331,12 @@ message PipelineAnalysisContext {
   optional string dataflow_graph_id = 1;
   // The path of the top-level pipeline file determined at runtime during 
pipeline initialization.
   optional string definition_path = 2;
-  // The name of the Flow involved in this analysis
-  optional string flow_name = 3;
+  // (Deprecated) The name of the Flow involved in this analysis
+  //
+  // This field is deprecated since Spark 4.2+. Use flow_identifier field 
instead.
+  optional string flow_name = 3 [deprecated = true];
+  // The identifier of the Flow involved in this analysis
+  optional ResolvedIdentifier flow_identifier = 4;
 
   // Reserved field for protocol extensions.
   repeated google.protobuf.Any extension = 999;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to