This is an automated email from the ASF dual-hosted git repository.

szehon-ho pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.2 by this push:
     new f56d2bf6bb56 [SPARK-56956][SPARK-56651][CONNECT][SDP][FOLLOWUP] 
Address review comments for AutoCDC flow dataclasses and Python APIs
f56d2bf6bb56 is described below

commit f56d2bf6bb5602b5a4d97d5c7fa54fb26c6f5e91
Author: Wenchen Fan <[email protected]>
AuthorDate: Mon Jun 1 12:43:38 2026 -0700

    [SPARK-56956][SPARK-56651][CONNECT][SDP][FOLLOWUP] Address review comments 
for AutoCDC flow dataclasses and Python APIs
    
    This follow-up PR addresses review comments left after #56042 (SPARK-56956, 
AutoCDC flow dataclasses) and #56069 (SPARK-56651, AutoCDC Python APIs) merged.
    
    ### What changes were proposed in this pull request?
    
    #### Scala — `Scd1BatchProcessor` / `Flow`
    
    - Remove the now-dead 
`Scd1BatchProcessor.validateCdcMetadataColumnNotPresent` validator and its call 
site. It referenced the error class `AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT` 
which the parent PR removed from `error-conditions.json`; the new 
construction-time check in 
`AutoCdcMergeFlow.requireReservedPrefixAbsentInSourceColumns` is the 
authoritative validator and supersedes it.
    - Reorder `AutoCdcFlow`'s constructor so defaulted params trail the 
non-defaulted ones (`origin`, `changeArgs`), allowing positional construction.
    - Fix Scaladoc/comment text: factual wording for the keys-presence check, 
the `[[ResolvedFlow.load]]` link, the `Scd1ForeachBatchHandler` reference (was 
`Scd1ForeachBatchExec`, which does not exist), and several minor 
grammar/typography nits.
    
    #### Python — `create_auto_cdc_flow` / `AutoCdcFlow`
    
    - Add a comment on the lazy `pyspark.sql.connect.functions.builtin` imports 
explaining the docs-build constraint (transitive grpc dependency missing from 
the docs environment), so a future refactor doesn't hoist the imports and 
silently break docs CI.
    - Fix the `INVALID_MULTIPLE_ARGUMENT_CONDITIONS` error template 
placeholder: `[{arg_names}]` → `[<arg_names>]`. 
`ErrorClassesReader.get_error_message` extracts required placeholders via 
`re.findall("<([a-zA-Z0-9_-]+)>", template)` and asserts the extracted set 
equals `messageParameters.keys()`, so the curly-brace form would trip an 
`AssertionError` instead of producing the intended `PySparkValueError`. The 
typo also affected existing callers in `sql/session.py:2267` and `sql/connect/s 
[...]
    - Minor docstring fixes: "merge" operation (was "merged"), consistency 
between `api.py` and `flow.py` on the `1`/`"1"` accepted set, missing commas, 
"excluded from" (was "excluded in"), `DataFrame` casing, and a backtick on 
`create_streaming_table`.
    
    ### Why are the changes needed?
    
    Cleanup of follow-up items identified during review of the parent PRs. The 
dead Scala validator is the most material: if its code path were reached, it 
would throw an internal `SparkException("Cannot find main error class ...")` 
instead of a user-facing `AnalysisException`. The error-template typo would 
surface as an `AssertionError` rather than a user-actionable error for the two 
existing callers.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests in `AutoCdcFlowSuite`, `Scd1BatchProcessorSuite`, 
`ConnectInvalidPipelineSuite`, and `ConnectValidPipelineSuite` continue to 
cover the affected paths.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Co-authored by Claude.
    
    Closes #56113 from cloud-fan/autocdc-flow-dataclasses-followup.
    
    Authored-by: Wenchen Fan <[email protected]>
    Signed-off-by: Szehon Ho <[email protected]>
    (cherry picked from commit 3e1650b9be0322daa98e7552c488cbbc75914edb)
    Signed-off-by: Szehon Ho <[email protected]>
---
 python/pyspark/errors/error-conditions.json        |  2 +-
 python/pyspark/pipelines/api.py                    | 20 +++++++------
 python/pyspark/pipelines/flow.py                   |  5 ++--
 .../sql/pipelines/autocdc/Scd1BatchProcessor.scala | 24 +---------------
 .../apache/spark/sql/pipelines/graph/Flow.scala    | 33 +++++++++++-----------
 .../sql/pipelines/graph/GraphValidations.scala     |  3 +-
 .../sql/pipelines/autocdc/AutoCdcFlowSuite.scala   | 18 ++++--------
 .../graph/ConnectInvalidPipelineSuite.scala        |  2 +-
 8 files changed, 42 insertions(+), 65 deletions(-)

diff --git a/python/pyspark/errors/error-conditions.json 
b/python/pyspark/errors/error-conditions.json
index 7cc5a73e254b..38417cbf0188 100644
--- a/python/pyspark/errors/error-conditions.json
+++ b/python/pyspark/errors/error-conditions.json
@@ -417,7 +417,7 @@
   },
   "INVALID_MULTIPLE_ARGUMENT_CONDITIONS": {
     "message": [
-      "[{arg_names}] cannot be <condition>."
+      "[<arg_names>] cannot be <condition>."
     ]
   },
   "INVALID_NDARRAY_DIMENSION": {
diff --git a/python/pyspark/pipelines/api.py b/python/pyspark/pipelines/api.py
index 084547f4c2b1..5cbc003708f2 100644
--- a/python/pyspark/pipelines/api.py
+++ b/python/pyspark/pipelines/api.py
@@ -541,8 +541,8 @@ def create_auto_cdc_flow(
 ) -> None:
     """
     Create an Auto CDC flow into the target table from the Change Data Capture 
(CDC) source.
-    Target table must have already been created using create_streaming_table 
function. Only one
-    of column_list and except_column_list can be specified.
+    Target table must have already been created using the 
`create_streaming_table` function.
+    Only one of column_list and except_column_list can be specified.
 
     Example:
         create_auto_cdc_flow(
@@ -576,16 +576,19 @@ def create_auto_cdc_flow(
     :param column_list: Columns that will be included in the output table. 
This should be a list \
         of column identifiers without qualifiers, expressed as either Python 
strings or PySpark \
         Columns. Only one of column_list and except_column_list can be 
specified.
-    :param except_column_list: Columns that will be excluded in the output 
table. This should be a \
-        list of column identifiers without qualifiers, expressed as either 
Python strings or \
+    :param except_column_list: Columns that will be excluded from the output 
table. This should \
+        be a list of column identifiers without qualifiers, expressed as 
either Python strings or \
         PySpark Columns. Only one of column_list and except_column_list can be 
specified. When \
-        this is specified, all columns in the dataframe of the target table 
except those in this \
-        list will be in the output table.
+        this is specified, all columns in the `DataFrame` of the target table 
except those in \
+        this list will be in the output table.
     :param stored_as_scd_type: The SCD type for the target table. Only 1 (or 
"1") is supported. \
-        When not specified the server default applies.
-    :param name: The name of the flow for this create_auto_cdc_flow command. 
When unspecified \
+        When not specified, the server default applies.
+    :param name: The name of the flow for this create_auto_cdc_flow command. 
When unspecified, \
         this will build a "default flow" with name equal to the target name.
     """
+    # Lazy import: pyspark.sql.connect.functions.builtin transitively imports 
grpc, which is
+    # not available in the docs-build environment. pyspark.pipelines.api is 
loaded eagerly
+    # from pyspark.pipelines.__init__, so a top-level import here would break 
docs CI.
     from pyspark.sql.connect.functions.builtin import expr as _connect_expr
 
     if type(target) is not str:
@@ -690,6 +693,7 @@ def _normalize_column_list(
     arg_name: str,
     column_list: Union[List[str], List[Column]],
 ) -> List[Column]:
+    # Lazy import: see comment in create_auto_cdc_flow.
     from pyspark.sql.connect.functions.builtin import col as _connect_col
 
     if not isinstance(column_list, list):
diff --git a/python/pyspark/pipelines/flow.py b/python/pyspark/pipelines/flow.py
index 02e971aedd87..b1922454a551 100644
--- a/python/pyspark/pipelines/flow.py
+++ b/python/pyspark/pipelines/flow.py
@@ -56,10 +56,11 @@ class AutoCdcFlow:
     :param source: The name of the CDC source to stream from.
     :param keys: Column(s) that uniquely identify a row in source and target 
data.
     :param sequence_by: Expression used to order the source data.
-    :param apply_as_deletes: Optional delete condition for the merged 
operation.
+    :param apply_as_deletes: Optional delete condition for the merge operation.
     :param column_list: Optional columns to include in the output table.
     :param except_column_list: Optional columns to exclude from the output 
table.
-    :param stored_as_scd_type: Optional SCD type for the target table. Only 1 
is supported.
+    :param stored_as_scd_type: Optional SCD type for the target table. Only 1 
(or "1") is \
+        supported.
     :param source_code_location: The location of the source code that created 
this flow.
     """
 
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
index 0035f442fb00..0656a7eb91b0 100644
--- 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.pipelines.autocdc
 
 import org.apache.spark.SparkException
-import org.apache.spark.sql.{functions => F, AnalysisException}
+import org.apache.spark.sql.{functions => F}
 import org.apache.spark.sql.Column
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.util.QuotingUtils
@@ -130,9 +130,6 @@ case class Scd1BatchProcessor(
    */
   private[autocdc] def extendMicrobatchRowsWithCdcMetadata(
       validatedMicrobatch: DataFrame): DataFrame = {
-    // Proactively validate the reserved CDC metadata column does not exist in 
the microbatch.
-    validateCdcMetadataColumnNotPresent(validatedMicrobatch)
-
     val rowDeleteSequence: Column = changeArgs.deleteCondition match {
       case Some(deleteCondition) =>
         F.when(deleteCondition, changeArgs.sequencing).otherwise(F.lit(null))
@@ -409,25 +406,6 @@ case class Scd1BatchProcessor(
       .insert(columnsToInsertOnNewKey)
       .merge()
   }
-
-  private def validateCdcMetadataColumnNotPresent(microbatch: DataFrame): Unit 
= {
-    val microbatchSqlConf = microbatch.sparkSession.sessionState.conf
-    val resolver = microbatchSqlConf.resolver
-
-    microbatch.schema.fieldNames
-      .find(resolver(_, Scd1BatchProcessor.cdcMetadataColName))
-      .foreach { conflictingColumnName =>
-        throw new AnalysisException(
-          errorClass = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT",
-          messageParameters = Map(
-            "caseSensitivity" -> 
CaseSensitivityLabels.of(microbatchSqlConf.caseSensitiveAnalysis),
-            "columnName" -> conflictingColumnName,
-            "schemaName" -> "microbatch",
-            "reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName
-          )
-        )
-      }
-  }
 }
 
 object Scd1BatchProcessor {
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala
index 740533d7504e..f88b0cd3a1cb 100644
--- 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala
@@ -137,10 +137,10 @@ sealed trait UnresolvedFlow extends Flow {
  * An [[UnresolvedFlow]] whose execution-type has not yet been determined.
  *
  * In some cases, we know the execution-type for an [[UnresolvedFlow]] even 
before flow analysis
- * and resolution. For example an AutoCDCFlow is a special 
unresolved-but-typed flow; we know a
- * flow will be an AutoCDC flow immediately on construction, because it has 
its own special
- * registration API. Such flows are considered "typed flows", but there isn't 
any semantic reason
- * yet to explicitly introduce a `TypedFlow` trait/class.
+ * and resolution. For example, an [[AutoCdcFlow]] is a special 
unresolved-but-typed flow; we
+ * know a flow will be an AutoCDC flow immediately on construction, because it 
has its own
+ * special registration API. Such flows are considered "typed flows", but 
there isn't any
+ * semantic reason yet to explicitly introduce a `TypedFlow` trait/class.
  */
 case class UntypedFlow(
     identifier: TableIdentifier,
@@ -161,17 +161,16 @@ case class UntypedFlow(
  * [[AutoCdcFlow]] is a typed flow because it is only supported for streaming, 
and not as a once
  * flow. Therefore by definition it is a streaming-type flow.
  *
- * In the future once-support for [[AutoCdcFlow]] may be added.
+ * In the future, support for once-mode [[AutoCdcFlow]] may be added.
  */
 case class AutoCdcFlow(
     identifier: TableIdentifier,
     destinationIdentifier: TableIdentifier,
     func: FlowFunction,
     queryContext: QueryContext,
-    sqlConf: Map[String, String] = Map.empty,
-    comment: Option[String] = None,
     override val origin: QueryOrigin,
-    changeArgs: ChangeArgs
+    changeArgs: ChangeArgs,
+    sqlConf: Map[String, String] = Map.empty
 ) extends UnresolvedFlow {
   override val once: Boolean = false
 
@@ -245,8 +244,8 @@ class AppendOnceFlow(
 }
 
 /**
- * A resolved flow that applies a CDC event stream to a target table via 
MERGE, in accordance to
- * the configured [[flow.changeArgs]].
+ * A resolved flow that applies a CDC event stream to a target table via 
MERGE, in accordance
+ * with the configured [[flow.changeArgs]].
  */
 class AutoCdcMergeFlow(
     val flow: AutoCdcFlow,
@@ -264,8 +263,8 @@ class AutoCdcMergeFlow(
       columnSelection = changeArgs.columnSelection,
       caseSensitive = spark.sessionState.conf.caseSensitiveAnalysis
     )
-    // AutoCDC flows require all key columns to be present in the target 
table, to adhere to SCD
-    // semantics.
+    // AutoCDC flows require all key columns to be present in the 
user-selected source schema,
+    // so that they survive into the target table where SCD reconciliation 
needs them.
     requireKeysPresentInSelectedSchema(selectedSchema)
     selectedSchema
   }
@@ -305,11 +304,11 @@ class AutoCdcMergeFlow(
    * Returns an empty dataframe whose schema matches 
[[AutoCdcMergeFlow.schema]]. By construction,
    * the returned dataframe will be a streaming dataframe.
    *
-   * In practice, [[AutoCdcMergeFlow.load]] is not invoked during graph 
analysis or execution.
-   * An AutoCdcMergeFlow can only be an input to a streaming table (not an MV 
or
-   * persisted/temp view), and streaming tables consume a 
[[VirtualTableInput]] rather than the
-   * producing [[Flow]] directly. [[VirtualTableInput]] overrides its own 
[[load]] to do schema
-   * inference on its input flows, rather than a transitive [[Flow.load]].
+   * Today, [[AutoCdcMergeFlow.load]] is not actually ever called during graph 
analysis or
+   * execution. An AutoCdcMergeFlow can only be an input to a streaming table 
(not an MV or
+   * persisted/temp view), and streaming tables take a [[VirtualTableInput]] 
as input, not
+   * the producing [[Flow]] directly. [[VirtualTableInput]] overrides its own 
[[load]] to do
+   * schema inference on its input flows, rather than a transitive 
[[ResolvedFlow.load]].
    *
    * The implementation exists for API consistency and throws an internal 
error if invoked with
    * `asStreaming = false`, or if the underlying source dataframe is not 
streaming, to surface
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphValidations.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphValidations.scala
index a80fdafd1c18..d56b95b5830b 100644
--- 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphValidations.scala
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphValidations.scala
@@ -35,7 +35,8 @@ trait GraphValidations extends Logging {
   protected[pipelines] def validateMultiQueryTables(): Map[TableIdentifier, 
Seq[Flow]] = {
     val multiQueryTables = flowsTo.filter(_._2.size > 1)
 
-    // A multiflow table may not have an AutoCDC flow; AutoCDC flow targets 
must be single query.
+    // A multiflow table may not have an AutoCDC flow; AutoCDC targets must 
have exactly one
+    // input flow.
     multiQueryTables
       .find { case (_, flows) => flows.exists(isAutoCdcFlow) }
       .foreach {
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala
index 932110b94afd..cf7c9533bee9 100644
--- 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala
@@ -74,7 +74,6 @@ class AutoCdcFlowSuite extends QueryTest with 
SharedSparkSession {
       func: FlowFunction = noOpFlowFunction,
       queryContext: QueryContext = testQueryContext,
       sqlConf: Map[String, String] = Map.empty,
-      comment: Option[String] = None,
       origin: QueryOrigin = QueryOrigin.empty,
       changeArgs: ChangeArgs = testChangeArgs): AutoCdcFlow = {
     AutoCdcFlow(
@@ -83,7 +82,6 @@ class AutoCdcFlowSuite extends QueryTest with 
SharedSparkSession {
       func = func,
       queryContext = queryContext,
       sqlConf = sqlConf,
-      comment = comment,
       origin = origin,
       changeArgs = changeArgs
     )
@@ -91,8 +89,7 @@ class AutoCdcFlowSuite extends QueryTest with 
SharedSparkSession {
 
   test("AutoCdcFlow exposes its constructor fields") {
     val flow = newAutoCdcFlow(
-      sqlConf = Map("spark.sql.shuffle.partitions" -> "8"),
-      comment = Some("my CDC flow")
+      sqlConf = Map("spark.sql.shuffle.partitions" -> "8")
     )
 
     assert(flow.identifier == testIdentifier)
@@ -100,12 +97,11 @@ class AutoCdcFlowSuite extends QueryTest with 
SharedSparkSession {
     assert(flow.func eq noOpFlowFunction)
     assert(flow.queryContext == testQueryContext)
     assert(flow.sqlConf == Map("spark.sql.shuffle.partitions" -> "8"))
-    assert(flow.comment.contains("my CDC flow"))
     assert(flow.origin == QueryOrigin.empty)
     assert(flow.changeArgs == testChangeArgs)
   }
 
-  test("AutoCdcFlow defaults sqlConf to empty and comment to None") {
+  test("AutoCdcFlow defaults sqlConf to empty") {
     // Confirms the case-class default values match the documented contract; 
downstream
     // registration code relies on `sqlConf` being a non-null empty map by 
default so that
     // `defaultSqlConf ++ flowDef.sqlConf` is well-defined in 
[[GraphRegistrationContext]].
@@ -119,7 +115,6 @@ class AutoCdcFlowSuite extends QueryTest with 
SharedSparkSession {
     )
 
     assert(flow.sqlConf.isEmpty)
-    assert(flow.comment.isEmpty)
   }
 
   test("AutoCdcFlow.once is always false") {
@@ -143,7 +138,6 @@ class AutoCdcFlowSuite extends QueryTest with 
SharedSparkSession {
     assert(updated.destinationIdentifier == original.destinationIdentifier)
     assert(updated.func eq original.func)
     assert(updated.queryContext == original.queryContext)
-    assert(updated.comment == original.comment)
     assert(updated.origin == original.origin)
     assert(updated.changeArgs == original.changeArgs)
     // The original must not be mutated.
@@ -165,7 +159,7 @@ class AutoCdcFlowSuite extends QueryTest with 
SharedSparkSession {
       sqlConf = Map.empty
     )
 
-  /** Builds a [[AutoCdcMergeFlow]] over the given source dataframe + change 
args. */
+  /** Builds an [[AutoCdcMergeFlow]] over the given source dataframe + change 
args. */
   private def newAutoCdcMergeFlow(
       sourceDf: DataFrame,
       keys: Seq[UnqualifiedColumnName] = Seq(UnqualifiedColumnName("id")),
@@ -445,9 +439,9 @@ class AutoCdcFlowSuite extends QueryTest with 
SharedSparkSession {
     "AutoCdcMergeFlow rejects a source df column whose name equals the 
reserved CDC " +
     "metadata column"
   ) {
-    // Locks in the previous engine-level guard 
(Scd1BatchProcessor.extendMicrobatchRowsWith
-    // CdcMetadata) at flow-construction time. Any future regression where a 
user-supplied
-    // CDC stream carries the reserved metadata column name should fail 
eagerly here.
+    // Locks in the previous engine-level guard at flow-construction time. Any 
future
+    // regression where a user-supplied CDC stream carries the reserved 
metadata column name
+    // should fail eagerly here.
     val sourceDf = 
sourceDfWithExtraColumns(Scd1BatchProcessor.cdcMetadataColName -> StringType)
 
     checkError(
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala
index 8dad5019c0fe..6eda2afdcdb8 100644
--- 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala
@@ -658,7 +658,7 @@ class ConnectInvalidPipelineSuite extends PipelineTest with 
SharedSparkSession {
   test(
     "AutoCDC flow targeting a temporary view fails with 
AUTOCDC_RELATION_FOR_TEMPORARY_VIEW"
   ) {
-    // Temporary views in SDP normally accept either streaming or batch 
producing flows, but
+    // Temporary views in SDP normally accept either streaming or 
batch-producing flows, but
     // AutoCDC flows are an explicit exception: SCD reconciliation only runs 
at the
     // streaming-table sink (`Scd1ForeachBatchHandler`), so pointing an 
AutoCDC flow at a view
     // would silently drop reconciliation and expose just the projected CDF to 
consumers.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to