This is an automated email from the ASF dual-hosted git repository.

gengliangwang pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.2 by this push:
     new 9da419233dfa [SPARK-57080][SDP] Register AutoCDC Flows from 
`PipelinesHandler`
9da419233dfa is described below

commit 9da419233dfa9f104e3276cb0e6a4232b66aab4c
Author: AnishMahto <[email protected]>
AuthorDate: Wed May 27 11:03:55 2026 -0700

    [SPARK-57080][SDP] Register AutoCDC Flows from `PipelinesHandler`
    
    ### What changes were proposed in this pull request?
    In the `PipelinesHandler`, register an `AutoCdcFlow` when a `DefineFlow` 
proto is received with `AUTO_CDC_FLOW_DETAILS`.
    
    This is the final step in connecting a spark connect client to the spark 
connect server's SDP engine for AutoCDC flows. With these changes, a user 
should be able to run their SDP with AutoCDC flows using the pipelines CLI 
runner.
    
    ### Why are the changes needed?
    Allows spark connect clients to actually register and execute AutoCDC flows 
within their pipelines.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Test graph registration and construction through Python client in 
`PythonPipelineSuite`.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    Co-authored.
    
    Generated-by: Claude-Opus-4.7-thinking-xhigh
    
    Closes #56124 from 
AnishMahto/SPARK-56957-register-autocdc-flow-from-pipelineshandler.
    
    Lead-authored-by: AnishMahto <[email protected]>
    Co-authored-by: Wenchen Fan <[email protected]>
    Signed-off-by: Gengliang Wang <[email protected]>
    (cherry picked from commit b848baaee6c7b66ed60cd0747539ac8fa8cb56ed)
    Signed-off-by: Gengliang Wang <[email protected]>
---
 .../src/main/resources/error/error-conditions.json |  26 +-
 .../sql/connect/pipelines/PipelinesHandler.scala   | 143 +++++++-
 .../sql/connect/planner/SparkConnectPlanner.scala  |   3 +-
 .../connect/pipelines/PythonPipelineSuite.scala    | 400 ++++++++++++++++++++-
 .../spark/sql/pipelines/autocdc/ChangeArgs.scala   |   8 +-
 .../sql/pipelines/autocdc/ChangeArgsSuite.scala    |   6 +-
 6 files changed, 554 insertions(+), 32 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index defa7424d268..d66692a3b22e 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -191,6 +191,12 @@
     ],
     "sqlState" : "0A000"
   },
+  "AUTOCDC_BOTH_COLUMN_LIST_AND_EXCEPT_COLUMN_LIST" : {
+    "message" : [
+      "AutoCDC flow specifies both `column_list` and `except_column_list`; at 
most one may be provided."
+    ],
+    "sqlState" : "42613"
+  },
   "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA" : {
     "message" : [
       "Using <caseSensitivity> column name comparison, the following columns 
are not present in the <schemaName> schema: <missingColumns>. Available 
columns: <availableColumns>."
@@ -232,11 +238,23 @@
     },
     "sqlState" : "22000"
   },
+  "AUTOCDC_MISSING_SEQUENCE_BY" : {
+    "message" : [
+      "AutoCDC flow is missing a required `sequence_by` expression. Specify a 
`sequence_by` column or expression that orders incoming change events."
+    ],
+    "sqlState" : "22023"
+  },
+  "AUTOCDC_MISSING_SOURCE" : {
+    "message" : [
+      "AutoCDC flow is missing a required `source` table name. Specify the 
name of the streaming source table the flow should read from."
+    ],
+    "sqlState" : "22023"
+  },
   "AUTOCDC_MULTIPART_COLUMN_IDENTIFIER" : {
     "message" : [
       "Expected a single column identifier; got the multi-part identifier 
<columnName> (parts: <nameParts>)."
     ],
-    "sqlState" : "42703"
+    "sqlState" : "22023"
   },
   "AUTOCDC_MULTIPLE_FLOWS_TO_TARGET" : {
     "message" : [
@@ -244,6 +262,12 @@
     ],
     "sqlState" : "42000"
   },
+  "AUTOCDC_NON_COLUMN_IDENTIFIER" : {
+    "message" : [
+      "Expected a column identifier; got the non-attribute expression 
`<expression>`. AutoCDC keys, sequence_by, column_list, and except_column_list 
must reference unqualified column names."
+    ],
+    "sqlState" : "22023"
+  },
   "AUTOCDC_RESERVED_COLUMN_NAME_PREFIX_CONFLICT" : {
     "message" : [
       "The column `<columnName>` in the <schemaName> schema collides with the 
reserved AutoCDC column name prefix `<reservedColumnNamePrefix>` (using 
<caseSensitivity> column name comparison). Rename or remove the column."
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
index 04dbc1a45506..f8edbc992800 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.connect.pipelines
 
-import scala.collection.Seq
 import scala.jdk.CollectionConverters._
 import scala.util.Using
 
@@ -25,16 +24,21 @@ import io.grpc.stub.StreamObserver
 
 import org.apache.spark.connect.proto
 import org.apache.spark.connect.proto.{ExecutePlanResponse, 
PipelineCommandResult, Relation, ResolvedIdentifier}
+import 
org.apache.spark.connect.proto.PipelineCommand.DefineFlow.AutoCdcFlowDetails
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.{AnalysisException, Column}
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, 
UnresolvedRelation}
+import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.plans.logical.{Command, CreateNamespace, 
CreateTable, CreateTableAsSelect, CreateView, DescribeRelation, 
DescribeTablePartition, DropView, InsertIntoStatement, LogicalPlan, 
RenameTable, ShowColumns, ShowCreateTable, ShowFunctions, ShowTableProperties, 
ShowTables, ShowViews}
+import org.apache.spark.sql.classic.ClassicConversions._
 import org.apache.spark.sql.connect.common.DataTypeProtoConverter
 import org.apache.spark.sql.connect.service.SessionHolder
 import org.apache.spark.sql.execution.command.{ShowCatalogsCommand, 
ShowNamespacesCommand}
 import org.apache.spark.sql.pipelines.Language.Python
+import org.apache.spark.sql.pipelines.autocdc.{ChangeArgs, ColumnSelection, 
ScdType, UnqualifiedColumnName}
 import org.apache.spark.sql.pipelines.common.RunState.{CANCELED, FAILED}
-import org.apache.spark.sql.pipelines.graph.{AllTables, FlowAnalysis, 
GraphIdentifierManager, GraphRegistrationContext, IdentifierHelper, NoTables, 
PipelineUpdateContextImpl, QueryContext, QueryOrigin, QueryOriginType, Sink, 
SinkImpl, SomeTables, SqlGraphRegistrationContext, Table, TableFilter, 
TemporaryView, UntypedFlow}
+import org.apache.spark.sql.pipelines.graph.{AllTables, AutoCdcFlow, 
FlowAnalysis, GraphIdentifierManager, GraphRegistrationContext, 
IdentifierHelper, NoTables, PipelineUpdateContextImpl, QueryContext, 
QueryOrigin, QueryOriginType, Sink, SinkImpl, SomeTables, 
SqlGraphRegistrationContext, Table, TableFilter, TemporaryView, UntypedFlow}
 import org.apache.spark.sql.pipelines.logging.{PipelineEvent, RunProgress}
 import org.apache.spark.sql.types.StructType
 
@@ -52,6 +56,8 @@ private[connect] object PipelinesHandler extends Logging {
    * @param transformRelationFunc
    *   Function used to convert a relation to a LogicalPlan. This is used when 
determining the
    *   LogicalPlan that a flow returns.
+   * @param transformExpressionFunc
+   *   Function used to convert a proto expression to a Catalyst expression.
    * @return
    *   The response after handling the command
    */
@@ -59,7 +65,8 @@ private[connect] object PipelinesHandler extends Logging {
       sessionHolder: SessionHolder,
       cmd: proto.PipelineCommand,
       responseObserver: StreamObserver[ExecutePlanResponse],
-      transformRelationFunc: Relation => LogicalPlan): PipelineCommandResult = 
{
+      transformRelationFunc: Relation => LogicalPlan,
+      transformExpressionFunc: proto.Expression => Expression): 
PipelineCommandResult = {
     // Currently most commands do not include any information in the response. 
We just send back
     // an empty response to the client to indicate that the command was 
handled successfully
     val defaultResponse = PipelineCommandResult.getDefaultInstance
@@ -99,7 +106,11 @@ private[connect] object PipelinesHandler extends Logging {
       case proto.PipelineCommand.CommandTypeCase.DEFINE_FLOW =>
         logInfo(s"Define pipelines flow cmd received: $cmd")
         val resolvedFlow =
-          defineFlow(cmd.getDefineFlow, transformRelationFunc, sessionHolder)
+          defineFlow(
+            cmd.getDefineFlow,
+            transformRelationFunc,
+            transformExpressionFunc,
+            sessionHolder)
         val identifierBuilder = ResolvedIdentifier.newBuilder()
         resolvedFlow.catalog.foreach(identifierBuilder.setCatalogName)
         resolvedFlow.database.foreach { ns =>
@@ -315,6 +326,7 @@ private[connect] object PipelinesHandler extends Logging {
   private def defineFlow(
       flow: proto.PipelineCommand.DefineFlow,
       transformRelationFunc: Relation => LogicalPlan,
+      transformExpressionFunc: proto.Expression => Expression,
       sessionHolder: SessionHolder): TableIdentifier = {
     if (flow.hasOnce) {
       throw new AnalysisException(
@@ -379,22 +391,125 @@ private[connect] object PipelinesHandler extends Logging 
{
             sqlConf = flow.getSqlConfMap.asScala.toMap,
             once = false,
             queryContext = QueryContext(Option(defaultCatalog), 
Option(defaultDatabase)),
-            origin = QueryOrigin(
-              filePath = Option.when(flow.getSourceCodeLocation.hasFileName)(
-                flow.getSourceCodeLocation.getFileName),
-              line = Option.when(flow.getSourceCodeLocation.hasLineNumber)(
-                flow.getSourceCodeLocation.getLineNumber),
-              objectType = Some(QueryOriginType.Flow.toString),
-              objectName = Option(flowIdentifier.unquotedString),
-              language = Some(Python()))))
+            origin = flowOrigin(flow, flowIdentifier)))
       case proto.PipelineCommand.DefineFlow.DetailsCase.AUTO_CDC_FLOW_DETAILS 
=>
-        throw new UnsupportedOperationException("AutoCdcFlowDetails is not yet 
implemented.")
+        graphElementRegistry.registerFlow(
+          buildAutoCdcFlow(
+            autoCdcDetails = flow.getAutoCdcFlowDetails,
+            flow = flow,
+            flowIdentifier = flowIdentifier,
+            destinationIdentifier = destinationIdentifier,
+            defaultCatalog = defaultCatalog,
+            defaultDatabase = defaultDatabase,
+            sessionHolder = sessionHolder,
+            transformExpressionFunc = transformExpressionFunc))
       case other =>
         throw new UnsupportedOperationException(s"Unsupported DefineFlow 
details case: $other")
     }
     flowIdentifier
   }
 
+  /**
+   * Build an [[AutoCdcFlow]] from the proto-supplied AutoCDC flow details.
+   *
+   * The flow's source expression is encoded by the Python client as a 
streaming-table name; we
+   * model that on the server side as a streaming [[UnresolvedRelation]] so 
that pipelines flow
+   * analysis (which already handles `STREAM(t)` references) can resolve it 
against the rest of
+   * the dataflow graph.
+   */
+  private def buildAutoCdcFlow(
+      autoCdcDetails: AutoCdcFlowDetails,
+      flow: proto.PipelineCommand.DefineFlow,
+      flowIdentifier: TableIdentifier,
+      destinationIdentifier: TableIdentifier,
+      defaultCatalog: String,
+      defaultDatabase: String,
+      sessionHolder: SessionHolder,
+      transformExpressionFunc: proto.Expression => Expression): AutoCdcFlow = {
+    // TODO(SPARK-57092): apply_as_truncates is declared on AutoCdcFlowDetails 
but is not yet
+    //   honored by the engine; wire it through once SCD1 truncate support 
lands.
+    // TODO(SPARK-57093): ignore_null_updates_column_list and 
ignore_null_updates_except_column_list
+    //   are declared on AutoCdcFlowDetails but are not yet honored by the 
engine; wire them
+    //   through once SCD1 ignore-null support lands.
+
+    if (!autoCdcDetails.hasSource) {
+      throw new AnalysisException("AUTOCDC_MISSING_SOURCE", Map.empty)
+    }
+    if (!autoCdcDetails.hasSequenceBy) {
+      throw new AnalysisException("AUTOCDC_MISSING_SEQUENCE_BY", Map.empty)
+    }
+
+    val sourcePlan: LogicalPlan = UnresolvedRelation(
+      multipartIdentifier = GraphIdentifierManager
+        .parseTableIdentifier(name = autoCdcDetails.getSource, spark = 
sessionHolder.session)
+        .nameParts,
+      isStreaming = true)
+
+    val toColumn: proto.Expression => Column = expr => 
Column(transformExpressionFunc(expr))
+
+    val asUnqualifiedColumnName: proto.Expression => UnqualifiedColumnName = 
expr =>
+      transformExpressionFunc(expr) match {
+        case a: UnresolvedAttribute => UnqualifiedColumnName(a.nameParts)
+        case other =>
+          throw new AnalysisException(
+            "AUTOCDC_NON_COLUMN_IDENTIFIER",
+            Map("expression" -> other.sql))
+      }
+
+    val keys = 
autoCdcDetails.getKeysList.asScala.toSeq.map(asUnqualifiedColumnName)
+
+    val columnSelection: Option[ColumnSelection] = {
+      val included = autoCdcDetails.getColumnListList.asScala.toSeq
+      val excluded = autoCdcDetails.getExceptColumnListList.asScala.toSeq
+      if (included.nonEmpty && excluded.nonEmpty) {
+        throw new 
AnalysisException("AUTOCDC_BOTH_COLUMN_LIST_AND_EXCEPT_COLUMN_LIST", Map.empty)
+      } else if (included.nonEmpty) {
+        
Some(ColumnSelection.IncludeColumns(included.map(asUnqualifiedColumnName)))
+      } else if (excluded.nonEmpty) {
+        
Some(ColumnSelection.ExcludeColumns(excluded.map(asUnqualifiedColumnName)))
+      } else {
+        None
+      }
+    }
+
+    // Get user specified SCD type, or default to SCD1 if unspecified.
+    val scdType: ScdType = autoCdcDetails.getStoredAsScdType match {
+      case proto.PipelineCommand.DefineFlow.SCDType.SCD_TYPE_1 |
+          proto.PipelineCommand.DefineFlow.SCDType.SCD_TYPE_UNSPECIFIED =>
+        ScdType.Type1
+      case other =>
+        throw new UnsupportedOperationException(s"Unsupported AutoCDC SCD 
type: $other")
+    }
+
+    val changeArgs = ChangeArgs(
+      keys = keys,
+      sequencing = toColumn(autoCdcDetails.getSequenceBy),
+      storedAsScdType = scdType,
+      deleteCondition =
+        
Option.when(autoCdcDetails.hasApplyAsDeletes)(toColumn(autoCdcDetails.getApplyAsDeletes)),
+      columnSelection = columnSelection)
+
+    AutoCdcFlow(
+      identifier = flowIdentifier,
+      destinationIdentifier = destinationIdentifier,
+      func = FlowAnalysis.createFlowFunctionFromLogicalPlan(sourcePlan),
+      sqlConf = flow.getSqlConfMap.asScala.toMap,
+      queryContext = QueryContext(Option(defaultCatalog), 
Option(defaultDatabase)),
+      origin = flowOrigin(flow, flowIdentifier),
+      changeArgs = changeArgs)
+  }
+
+  private def flowOrigin(
+      flow: proto.PipelineCommand.DefineFlow,
+      flowIdentifier: TableIdentifier): QueryOrigin = QueryOrigin(
+    filePath =
+      
Option.when(flow.getSourceCodeLocation.hasFileName)(flow.getSourceCodeLocation.getFileName),
+    line = Option.when(flow.getSourceCodeLocation.hasLineNumber)(
+      flow.getSourceCodeLocation.getLineNumber),
+    objectType = Some(QueryOriginType.Flow.toString),
+    objectName = Option(flowIdentifier.unquotedString),
+    language = Some(Python()))
+
   private def startRun(
       cmd: proto.PipelineCommand.StartRun,
       responseObserver: StreamObserver[ExecutePlanResponse],
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index db78dc1744ec..c84eaadaa453 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -2955,7 +2955,8 @@ class SparkConnectPlanner(
       sessionHolder,
       command,
       responseObserver,
-      transformRelation)
+      transformRelation,
+      transformExpression)
     executeHolder.eventsManager.postFinished()
     responseObserver.onNext(
       proto.ExecutePlanResponse
diff --git 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala
 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala
index fd05b0cc357e..834e2d8144e1 100644
--- 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala
+++ 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala
@@ -29,15 +29,18 @@ import scala.util.Try
 import org.scalactic.source.Position
 import org.scalatest.Tag
 
+import org.apache.spark.SparkConf
 import org.apache.spark.api.python.PythonUtils
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.classic.ColumnConversions._
 import org.apache.spark.sql.connect.PythonTestDepsChecker
 import org.apache.spark.sql.connect.service.SparkConnectService
-import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
+import org.apache.spark.sql.connector.catalog.{Identifier, 
InMemoryTableCatalog, TableCatalog}
 import org.apache.spark.sql.pipelines.Language.Python
+import org.apache.spark.sql.pipelines.autocdc.{ColumnSelection, ScdType, 
UnqualifiedColumnName}
 import org.apache.spark.sql.pipelines.common.FlowStatus
-import org.apache.spark.sql.pipelines.graph.{DataflowGraph, 
PipelineUpdateContextImpl, QueryOrigin, QueryOriginType}
+import org.apache.spark.sql.pipelines.graph.{AutoCdcFlow, AutoCdcMergeFlow, 
DataflowGraph, PipelineUpdateContextImpl, QueryOrigin, QueryOriginType}
 import org.apache.spark.sql.pipelines.logging.EventLevel
 import org.apache.spark.sql.pipelines.utils.{EventVerificationTestHelpers, 
TestPipelineUpdateContextMixin}
 import org.apache.spark.sql.types.StructType
@@ -51,10 +54,24 @@ class PythonPipelineSuite
     with TestPipelineUpdateContextMixin
     with EventVerificationTestHelpers {
 
-  def buildGraph(pythonText: String): DataflowGraph = {
+  // Register a V2 in-memory catalog so AutoCDC tests can exercise 
pipeline-default-catalog
+  // inheritance against a name that is never the session default 
`spark_catalog`. The V2 in-memory
+  // catalog doesn't support streaming reads, but the AutoCDC tests that touch 
it only run graph
+  // resolution -- not pipeline execution -- so this is sufficient.
+  override def sparkConf: SparkConf = super.sparkConf
+    .set("spark.sql.catalog.my_catalog", classOf[InMemoryTableCatalog].getName)
+
+  def buildGraph(
+      pythonText: String,
+      defaultCatalog: Option[String] = None,
+      defaultDatabase: Option[String] = None,
+      setupSql: Option[String] = None): DataflowGraph = {
     val indentedPythonText = pythonText.linesIterator.map("        " + 
_).mkString("\n")
     // create a unique identifier to allow identifying the session and 
dataflow graph
     val customSessionIdentifier = UUID.randomUUID().toString
+    val defaultCatalogPyExpr = defaultCatalog.map(c => 
s""""$c"""").getOrElse("None")
+    val defaultDatabasePyExpr = defaultDatabase.map(d => 
s""""$d"""").getOrElse("None")
+    val setupSqlLine = setupSql.map(stmt => 
s"""spark.sql(\"\"\"$stmt\"\"\")""").getOrElse("")
     val pythonCode =
       s"""
          |from pyspark.sql import SparkSession
@@ -76,10 +93,12 @@ class PythonPipelineSuite
          |    .config("spark.custom.identifier", "$customSessionIdentifier") \\
          |    .create()
          |
+         |$setupSqlLine
+         |
          |dataflow_graph_id = create_dataflow_graph(
          |    spark,
-         |    default_catalog=None,
-         |    default_database=None,
+         |    default_catalog=$defaultCatalogPyExpr,
+         |    default_database=$defaultDatabasePyExpr,
          |    sql_conf={},
          |)
          |
@@ -151,7 +170,7 @@ class PythonPipelineSuite
           QueryOrigin(
             language = Option(Python()),
             filePath = Option("<string>"),
-            line = Option(34),
+            line = Option(36),
             objectName = Option("spark_catalog.default.table1"),
             objectType = Option(QueryOriginType.Flow.toString))),
       errorChecker = ex =>
@@ -203,7 +222,7 @@ class PythonPipelineSuite
             QueryOrigin(
               language = Option(Python()),
               filePath = Option("<string>"),
-              line = Option(40),
+              line = Option(42),
               objectName = Option("spark_catalog.default.mv2"),
               objectType = Option(QueryOriginType.Flow.toString))),
         expectedEventLevel = EventLevel.INFO)
@@ -217,7 +236,7 @@ class PythonPipelineSuite
             QueryOrigin(
               language = Option(Python()),
               filePath = Option("<string>"),
-              line = Option(44),
+              line = Option(46),
               objectName = Option("spark_catalog.default.mv"),
               objectType = Option(QueryOriginType.Flow.toString))),
         expectedEventLevel = EventLevel.INFO)
@@ -235,7 +254,7 @@ class PythonPipelineSuite
               QueryOrigin(
                 language = Option(Python()),
                 filePath = Option("<string>"),
-                line = Option(34),
+                line = Option(36),
                 objectName = Option("spark_catalog.default.table1"),
                 objectType = Option(QueryOriginType.Flow.toString))),
           expectedEventLevel = EventLevel.INFO)
@@ -249,7 +268,7 @@ class PythonPipelineSuite
               QueryOrigin(
                 language = Option(Python()),
                 filePath = Option("<string>"),
-                line = Option(49),
+                line = Option(51),
                 objectName = Option("spark_catalog.default.standalone_flow1"),
                 objectType = Option(QueryOriginType.Flow.toString))),
           expectedEventLevel = EventLevel.INFO)
@@ -935,6 +954,367 @@ class PythonPipelineSuite
     assert(ex.getMessage.contains("table_with_wrong_struct_schema"))
   }
 
+  private def buildAutoCdcFlow(pipelineSource: String): AutoCdcFlow = {
+    val graph = buildGraph(pipelineSource)
+    graph.flows
+      .collectFirst { case f: AutoCdcFlow => f }
+      .getOrElse(fail(s"Expected an AutoCdcFlow in the graph, got: 
${graph.flows}"))
+  }
+
+  test("AutoCDC API: minimal flow registers an AutoCdcFlow with default name 
and SCD1 default") {
+    val flow = buildAutoCdcFlow("""
+        |@dp.table
+        |def src():
+        |  return spark.readStream.format("rate").load()
+        |
+        |dp.create_streaming_table("target")
+        |
+        |dp.create_auto_cdc_flow(
+        |    target = "target",
+        |    source = "src",
+        |    keys = ["value"],
+        |    sequence_by = "timestamp",
+        |)
+        |""".stripMargin)
+
+    assert(flow.identifier == graphIdentifier("target"))
+    assert(flow.destinationIdentifier == graphIdentifier("target"))
+    assert(flow.changeArgs.keys == Seq(UnqualifiedColumnName("value")))
+    assert(flow.changeArgs.sequencing.expr.sql == "timestamp")
+    assert(flow.changeArgs.deleteCondition.isEmpty)
+    assert(flow.changeArgs.columnSelection.isEmpty)
+    assert(flow.changeArgs.storedAsScdType == ScdType.Type1)
+  }
+
+  test("AutoCDC API: composite keys are forwarded to ChangeArgs in order") {
+    val flow = buildAutoCdcFlow("""
+        |@dp.table
+        |def src():
+        |  return spark.readStream.format("rate").load()
+        |
+        |dp.create_streaming_table("target")
+        |
+        |dp.create_auto_cdc_flow(
+        |    target = "target",
+        |    source = "src",
+        |    keys = ["value", "timestamp"],
+        |    sequence_by = "timestamp",
+        |)
+        |""".stripMargin)
+
+    assert(
+      flow.changeArgs.keys ==
+        Seq(UnqualifiedColumnName("value"), 
UnqualifiedColumnName("timestamp")))
+  }
+
+  test("AutoCDC API: apply_as_deletes is forwarded as a delete condition 
column") {
+    val flow = buildAutoCdcFlow("""
+        |@dp.table
+        |def src():
+        |  return spark.readStream.format("rate").load()
+        |
+        |dp.create_streaming_table("target")
+        |
+        |dp.create_auto_cdc_flow(
+        |    target = "target",
+        |    source = "src",
+        |    keys = ["value"],
+        |    sequence_by = "timestamp",
+        |    apply_as_deletes = "value % 2 = 0",
+        |)
+        |""".stripMargin)
+
+    val deleteCondition = flow.changeArgs.deleteCondition.getOrElse(
+      fail("expected apply_as_deletes to populate deleteCondition"))
+    assert(deleteCondition.expr.sql.contains("value"))
+    assert(deleteCondition.expr.sql.contains("0"))
+  }
+
+  test("AutoCDC API: column_list is forwarded as IncludeColumns") {
+    val flow = buildAutoCdcFlow("""
+        |@dp.table
+        |def src():
+        |  return spark.readStream.format("rate").load()
+        |
+        |dp.create_streaming_table("target")
+        |
+        |dp.create_auto_cdc_flow(
+        |    target = "target",
+        |    source = "src",
+        |    keys = ["value"],
+        |    sequence_by = "timestamp",
+        |    column_list = ["value", "timestamp"],
+        |)
+        |""".stripMargin)
+
+    assert(
+      flow.changeArgs.columnSelection.contains(ColumnSelection.IncludeColumns(
+        Seq(UnqualifiedColumnName("value"), 
UnqualifiedColumnName("timestamp")))))
+  }
+
+  test("AutoCDC API: except_column_list is forwarded as ExcludeColumns") {
+    val flow = buildAutoCdcFlow("""
+        |@dp.table
+        |def src():
+        |  return spark.readStream.format("rate").load()
+        |
+        |dp.create_streaming_table("target")
+        |
+        |dp.create_auto_cdc_flow(
+        |    target = "target",
+        |    source = "src",
+        |    keys = ["value"],
+        |    sequence_by = "timestamp",
+        |    except_column_list = ["timestamp"],
+        |)
+        |""".stripMargin)
+
+    assert(
+      flow.changeArgs.columnSelection.contains(
+        
ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName("timestamp")))))
+  }
+
+  test("AutoCDC API: explicit `name` is honored as the flow identifier") {
+    val flow = buildAutoCdcFlow("""
+        |@dp.table
+        |def src():
+        |  return spark.readStream.format("rate").load()
+        |
+        |dp.create_streaming_table("target")
+        |
+        |dp.create_auto_cdc_flow(
+        |    target = "target",
+        |    source = "src",
+        |    keys = ["value"],
+        |    sequence_by = "timestamp",
+        |    name = "my_flow",
+        |)
+        |""".stripMargin)
+
+    assert(flow.identifier == graphIdentifier("my_flow"))
+    assert(flow.destinationIdentifier == graphIdentifier("target"))
+  }
+
+  test("AutoCDC API: multi-part `keys` column is rejected at flow 
registration") {
+    val ex = intercept[RuntimeException] {
+      buildAutoCdcFlow("""
+          |@dp.table
+          |def src():
+          |  return spark.readStream.format("rate").load()
+          |
+          |dp.create_streaming_table("target")
+          |
+          |dp.create_auto_cdc_flow(
+          |    target = "target",
+          |    source = "src",
+          |    keys = ["a.b"],
+          |    sequence_by = "timestamp",
+          |)
+          |""".stripMargin)
+    }
+    assert(ex.getMessage.contains("AUTOCDC_MULTIPART_COLUMN_IDENTIFIER"))
+  }
+
+  test("AutoCDC API: multi-part `column_list` entry is rejected at flow 
registration") {
+    val ex = intercept[RuntimeException] {
+      buildAutoCdcFlow("""
+          |@dp.table
+          |def src():
+          |  return spark.readStream.format("rate").load()
+          |
+          |dp.create_streaming_table("target")
+          |
+          |dp.create_auto_cdc_flow(
+          |    target = "target",
+          |    source = "src",
+          |    keys = ["value"],
+          |    sequence_by = "timestamp",
+          |    column_list = ["nested.field"],
+          |)
+          |""".stripMargin)
+    }
+    assert(ex.getMessage.contains("AUTOCDC_MULTIPART_COLUMN_IDENTIFIER"))
+  }
+
+  test("AutoCDC API: Column-object form of keys/sequence_by/apply_as_deletes 
is honored") {
+    val flow = buildAutoCdcFlow("""
+        |from pyspark.sql.functions import col, expr
+        |
+        |@dp.table
+        |def src():
+        |  return spark.readStream.format("rate").load()
+        |
+        |dp.create_streaming_table("target")
+        |
+        |dp.create_auto_cdc_flow(
+        |    target = "target",
+        |    source = "src",
+        |    keys = [col("value")],
+        |    sequence_by = col("timestamp"),
+        |    apply_as_deletes = expr("value % 2 = 0"),
+        |)
+        |""".stripMargin)
+
+    assert(flow.changeArgs.keys == Seq(UnqualifiedColumnName("value")))
+    assert(flow.changeArgs.sequencing.expr.sql == "timestamp")
+    val deleteCondition = flow.changeArgs.deleteCondition.getOrElse(
+      fail("expected apply_as_deletes to populate deleteCondition"))
+    assert(deleteCondition.expr.sql.contains("value"))
+    assert(deleteCondition.expr.sql.contains("0"))
+  }
+
+  test("AutoCDC API: graph resolves with the source streaming table as the 
flow's input") {
+    val graph = buildGraph("""
+        |@dp.table
+        |def src():
+        |  return spark.readStream.format("rate").load()
+        |
+        |dp.create_streaming_table("target")
+        |
+        |dp.create_auto_cdc_flow(
+        |    target = "target",
+        |    source = "src",
+        |    keys = ["value"],
+        |    sequence_by = "timestamp",
+        |)
+        |""".stripMargin).resolve()
+
+    val resolvedFlow = graph.resolvedFlow(graphIdentifier("target"))
+    assert(resolvedFlow.inputs == Set(graphIdentifier("src")))
+  }
+
+  test("AutoCDC API: single-part `source` inherits the pipeline's default 
catalog and database") {
+    // Use `my_catalog` (registered in `sparkConf`) so the pipeline-default 
catalog differs from
+    // the session default (`spark_catalog`), and a non-default namespace 
`my_db` so the
+    // pipeline-default database differs from the session default (`default`). 
The CREATE NAMESPACE
+    // runs on the same Connect session that subsequently creates the dataflow 
graph, so the
+    // namespace is visible to that session's per-session V2 catalog instance.
+    val graph = buildGraph(
+      """
+        |@dp.table
+        |def src():
+        |  return spark.readStream.format("rate").load()
+        |
+        |dp.create_streaming_table("target")
+        |
+        |dp.create_auto_cdc_flow(
+        |    target = "target",
+        |    source = "src",
+        |    keys = ["value"],
+        |    sequence_by = "timestamp",
+        |)
+        |""".stripMargin,
+      defaultCatalog = Some("my_catalog"),
+      defaultDatabase = Some("my_db"),
+      setupSql = Some("CREATE NAMESPACE IF NOT EXISTS 
my_catalog.my_db")).resolve()
+
+    val resolvedFlow =
+      graph.resolvedFlow(TableIdentifier("target", Some("my_db"), 
Some("my_catalog")))
+    assert(
+      resolvedFlow.inputs ==
+        Set(TableIdentifier("src", Some("my_db"), Some("my_catalog"))))
+  }
+
+  test("AutoCDC API: multi-part `source` resolves to the corresponding 
qualified dataset") {
+    val graph = buildGraph("""
+        |@dp.table(name = "some_catalog.some_schema.src")
+        |def irrelevant():
+        |  return spark.readStream.format("rate").load()
+        |
+        |dp.create_streaming_table(name = "some_catalog.some_schema.target")
+        |
+        |dp.create_auto_cdc_flow(
+        |    target = "some_catalog.some_schema.target",
+        |    source = "some_catalog.some_schema.src",
+        |    keys = ["value"],
+        |    sequence_by = "timestamp",
+        |)
+        |""".stripMargin).resolve()
+
+    val targetIdent = TableIdentifier("target", Some("some_schema"), 
Some("some_catalog"))
+    val srcIdent = TableIdentifier("src", Some("some_schema"), 
Some("some_catalog"))
+    val resolvedFlow = graph.resolvedFlow(targetIdent)
+    assert(resolvedFlow.inputs == Set(srcIdent))
+  }
+
+  test("AutoCDC API: non-attribute expression in keys is rejected") {
+    val ex = intercept[RuntimeException] {
+      buildGraph("""
+          |from pyspark.sql.functions import expr
+          |
+          |@dp.table
+          |def src():
+          |  return spark.readStream.format("rate").load()
+          |
+          |dp.create_streaming_table("target")
+          |
+          |dp.create_auto_cdc_flow(
+          |    target = "target",
+          |    source = "src",
+          |    keys = [expr("value + 1")],
+          |    sequence_by = "timestamp",
+          |)
+          |""".stripMargin)
+    }
+    assert(ex.getMessage.contains("AUTOCDC_NON_COLUMN_IDENTIFIER"))
+  }
+
+  test("AutoCDC API: specifying both column_list and except_column_list is 
rejected") {
+    // The Python create_auto_cdc_flow API does not currently enforce the "at 
most one" contract
+    // client-side, so the proto carries both lists to the server, where the 
structured error is
+    // raised. If/when a Python-side check is added, this test guards against 
the server-side
+    // defense being silently bypassed.
+    val ex = intercept[RuntimeException] {
+      buildGraph("""
+          |@dp.table
+          |def src():
+          |  return spark.readStream.format("rate").load()
+          |
+          |dp.create_streaming_table("target")
+          |
+          |dp.create_auto_cdc_flow(
+          |    target = "target",
+          |    source = "src",
+          |    keys = ["value"],
+          |    sequence_by = "timestamp",
+          |    column_list = ["value"],
+          |    except_column_list = ["timestamp"],
+          |)
+          |""".stripMargin)
+    }
+    
assert(ex.getMessage.contains("AUTOCDC_BOTH_COLUMN_LIST_AND_EXCEPT_COLUMN_LIST"))
+  }
+
+  test("AutoCDC API: registered flow survives graph resolution and validation 
end-to-end") {
+    val graph = buildGraph("""
+        |@dp.table
+        |def src():
+        |  return spark.readStream.format("rate").load()
+        |
+        |dp.create_streaming_table("target")
+        |
+        |dp.create_auto_cdc_flow(
+        |    target = "target",
+        |    source = "src",
+        |    keys = ["value"],
+        |    sequence_by = "timestamp",
+        |    apply_as_deletes = "value % 2 = 0",
+        |    column_list = ["value", "timestamp"],
+        |)
+        |""".stripMargin).resolve().validate()
+
+    val resolvedFlow = graph.resolvedFlow(graphIdentifier("target"))
+    assert(resolvedFlow.isInstanceOf[AutoCdcMergeFlow])
+    val mergeFlow = resolvedFlow.asInstanceOf[AutoCdcMergeFlow]
+    assert(mergeFlow.changeArgs.keys == Seq(UnqualifiedColumnName("value")))
+    assert(mergeFlow.changeArgs.sequencing.expr.sql == "timestamp")
+    assert(mergeFlow.changeArgs.deleteCondition.isDefined)
+    assert(
+      
mergeFlow.changeArgs.columnSelection.contains(ColumnSelection.IncludeColumns(
+        Seq(UnqualifiedColumnName("value"), 
UnqualifiedColumnName("timestamp")))))
+    assert(mergeFlow.changeArgs.storedAsScdType == ScdType.Type1)
+  }
+
   /**
    * Executes Python code in a separate process and returns the exit code.
    *
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala
index c475377ba506..49636acc1f8f 100644
--- 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala
@@ -32,14 +32,16 @@ case class UnqualifiedColumnName private (name: String) {
 }
 
 object UnqualifiedColumnName {
-  def apply(input: String): UnqualifiedColumnName = {
-    val nameParts = CatalystSqlParser.parseMultipartIdentifier(input)
+  def apply(nameParts: Seq[String]): UnqualifiedColumnName = {
     if (nameParts.length != 1) {
-      throw multipartColumnIdentifierError(input, nameParts)
+      throw multipartColumnIdentifierError(nameParts.mkString("."), nameParts)
     }
     new UnqualifiedColumnName(nameParts.head)
   }
 
+  def apply(input: String): UnqualifiedColumnName =
+    apply(CatalystSqlParser.parseMultipartIdentifier(input))
+
   private def multipartColumnIdentifierError(
       columnName: String,
       nameParts: Seq[String]
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala
index 1de2120a8f91..7be111003762 100644
--- 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala
@@ -326,7 +326,7 @@ class ChangeArgsSuite extends SparkFunSuite with 
SharedSparkSession {
         UnqualifiedColumnName("a.b")
       },
       condition = "AUTOCDC_MULTIPART_COLUMN_IDENTIFIER",
-      sqlState = "42703",
+      sqlState = "22023",
       parameters = Map(
         "columnName" -> "a.b",
         "nameParts" -> "a, b"
@@ -340,7 +340,7 @@ class ChangeArgsSuite extends SparkFunSuite with 
SharedSparkSession {
         UnqualifiedColumnName("src.x")
       },
       condition = "AUTOCDC_MULTIPART_COLUMN_IDENTIFIER",
-      sqlState = "42703",
+      sqlState = "22023",
       parameters = Map(
         "columnName" -> "src.x",
         "nameParts" -> "src, x"
@@ -354,7 +354,7 @@ class ChangeArgsSuite extends SparkFunSuite with 
SharedSparkSession {
         UnqualifiedColumnName("a.b.c")
       },
       condition = "AUTOCDC_MULTIPART_COLUMN_IDENTIFIER",
-      sqlState = "42703",
+      sqlState = "22023",
       parameters = Map(
         "columnName" -> "a.b.c",
         "nameParts" -> "a, b, c"


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to