This is an automated email from the ASF dual-hosted git repository.

cloud-fan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 232c309dec9e [SPARK-57113][SDP][FOLLOWUP] Cleanup AutoCDC Flow code
232c309dec9e is described below

commit 232c309dec9e72828d433ca3d410500b33cd230f
Author: Szehon Ho <[email protected]>
AuthorDate: Thu Jun 4 10:21:51 2026 +0800

    [SPARK-57113][SDP][FOLLOWUP] Cleanup AutoCDC Flow code
    
    ### What changes were proposed in this pull request?
    
    Follow-up to #56160 (SPARK-57113) addressing post-merge review comments and 
reducing duplication in the AutoCDC flow code and its test suites. No behavior 
change.
    
    `sql/pipelines/.../graph/FlowExecution.scala`:
    - Hoist the `org.json4s` imports out of `serializeKeyColumnNames` / 
`parseKeyColumnNames` to the top of the file, per Spark's import conventions.
    - Factor the duplicated AutoCDC key-field resolution shared by 
`auxiliaryKeyColumnNames` and `validateNoAutoCdcKeyDrift` into a single 
`expectedAuxiliaryKeyFields` helper.
    - Import `scala.collection.mutable` instead of using a fully-qualified 
inline reference.
    
    Tests:
    - Add a shared `singleAutoCdcFlowPipeline` helper to 
`AutoCdcGraphExecutionTestMixin` and use it across the AutoCDC SCD1 E2E suites 
(`AutoCdcScd1KeyDriftSuite`, `AutoCdcScd1MultiPipelineSuite`, 
`AutoCdcScd1AuxiliaryTableDurabilitySuite`, `AutoCdcScd1SchemaEvolutionSuite`), 
removing the repeated single-table/single-flow `TestGraphRegistrationContext` 
registration boilerplate.
    
    ### Why are the changes needed?
    
    Addresses non-blocking review feedback left on #56160 and reduces 
duplication in the AutoCDC flow code and its tests, improving readability and 
maintainability. The net diff removes ~300 lines.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pure refactor with no behavior change, covered by the existing AutoCDC 
suites:
    - `AutoCdcAuxiliaryTableSuite`
    - `AutoCdcScd1KeyDriftSuite`
    - `AutoCdcScd1MultiPipelineSuite`
    - `AutoCdcScd1AuxiliaryTableDurabilitySuite`
    - `AutoCdcScd1SchemaEvolutionSuite`
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Cursor (Claude Opus 4.8)
    
    Closes #56255 from szehon-ho/spark-57113-follow-up.
    
    Authored-by: Szehon Ho <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../spark/sql/pipelines/graph/FlowExecution.scala  |  50 ++-
 .../graph/AutoCdcGraphExecutionTestMixin.scala     |  30 ++
 .../AutoCdcScd1AuxiliaryTableDurabilitySuite.scala |  92 ++----
 .../pipelines/graph/AutoCdcScd1KeyDriftSuite.scala | 163 ++--------
 .../graph/AutoCdcScd1MultiPipelineSuite.scala      | 152 ++++-----
 .../graph/AutoCdcScd1SchemaEvolutionSuite.scala    | 358 +++++++++------------
 6 files changed, 316 insertions(+), 529 deletions(-)

diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala
index 0d1c33be2172..c4cd358344ca 100644
--- 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala
@@ -20,10 +20,14 @@ package org.apache.spark.sql.pipelines.graph
 import java.util.concurrent.ThreadPoolExecutor
 import java.util.concurrent.atomic.AtomicBoolean
 
+import scala.collection.mutable
 import scala.concurrent.{ExecutionContext, Future}
 import scala.jdk.CollectionConverters._
 import scala.util.control.NonFatal
 
+import org.json4s.JsonAST.{JArray, JString}
+import org.json4s.jackson.JsonMethods.{compact, parse}
+
 import org.apache.spark.SparkException
 import org.apache.spark.internal.{Logging, LogKeys}
 import org.apache.spark.sql.{AnalysisException, Dataset, Row}
@@ -346,8 +350,6 @@ object AutoCdcAuxiliaryTable {
    * upstream.
    */
   def serializeKeyColumnNames(names: Seq[String]): String = {
-    import org.json4s.JsonAST.{JArray, JString}
-    import org.json4s.jackson.JsonMethods.compact
     compact(JArray(names.map(JString(_)).toList))
   }
 
@@ -357,8 +359,6 @@ object AutoCdcAuxiliaryTable {
    * upstream.
    */
   def parseKeyColumnNames(raw: String): Option[Seq[String]] = {
-    import org.json4s.JsonAST.{JArray, JString}
-    import org.json4s.jackson.JsonMethods.parse
     val parsed = try Some(parse(raw)) catch { case NonFatal(_) => None }
     parsed.flatMap {
       case JArray(elems) =>
@@ -406,7 +406,7 @@ trait AutoCdcMergeWriteBase {
     val (catalog, v2Identifier) = 
PipelinesCatalogUtils.resolveTableCatalog(spark, auxIdent)
 
     if (!catalog.tableExists(v2Identifier)) {
-      val properties = scala.collection.mutable.Map.empty[String, String]
+      val properties = mutable.Map.empty[String, String]
 
       // Inherit the target's format so MERGE semantics line up. When 
unspecified, omit the
       // provider so the catalog falls back to its default.
@@ -440,18 +440,21 @@ trait AutoCdcMergeWriteBase {
   }
 
   /**
-   * Returns the resolved AutoCDC key column names as they appear in the 
auxiliary schema, in
-   * `changeArgs.keys` declaration order.
+   * Resolves each AutoCDC key in `changeArgs.keys` to its [[StructField]] in
+   * [[auxiliaryTableSchema]], preserving `changeArgs.keys` declaration order. 
This is the
+   * expected (flow-declared) side of drift validation, distinct from the keys 
recorded on an
+   * existing auxiliary table.
+   *
+   * [[AutoCdcMergeFlow]] should have validated that all `changeArgs.keys` 
exist in the deduced
+   * aux/target schemas by now, so a missing key is an internal error rather 
than a user-facing
+   * condition.
    */
-  private def auxiliaryKeyColumnNames: Seq[String] = {
+  private lazy val expectedAuxiliaryKeyFields: Seq[StructField] = {
     val resolver = spark.sessionState.conf.resolver
     changeArgs.keys.map { key =>
       auxiliaryTableSchema.fields
         .find(field => resolver(field.name, key.name))
-        .map(_.name)
         .getOrElse(
-          // This should never happen at this point, as [[AutoCdcMergeFlow]] 
should have validated
-          // all changeArgs.keys exist in the deduced aux/target table schemas 
by now.
           throw SparkException.internalError(
             s"AutoCDC key column '${key.name}' is missing from the auxiliary 
table schema " +
             s"for flow ${identifier.unquotedString} writing to target " +
@@ -461,6 +464,12 @@ trait AutoCdcMergeWriteBase {
     }
   }
 
+  /**
+   * Returns the resolved AutoCDC key column names as they appear in the 
auxiliary schema, in
+   * `changeArgs.keys` declaration order.
+   */
+  private lazy val auxiliaryKeyColumnNames: Seq[String] = 
expectedAuxiliaryKeyFields.map(_.name)
+
   /**
    * Validate that the target table's underlying connector implements
    * [[SupportsRowLevelOperations]], which is the V2 connector contract for 
MERGE/UPDATE/DELETE
@@ -512,21 +521,10 @@ trait AutoCdcMergeWriteBase {
     val resolver = spark.sessionState.conf.resolver
     val existingAuxSchema = 
CatalogV2Util.v2ColumnsToStructType(existingAuxTable.columns())
 
-    // The expected key fields are looked up in [[auxiliaryTableSchema]], 
which by construction
-    // contains every key column with its source-derived dataType. We 
deliberately do not look
-    // them up in [[existingAuxSchema]] - that's the recorded side, and 
conflating the two
-    // sides would mask drift.
-    val expectedKeyFields: Seq[StructField] = changeArgs.keys.map { key =>
-      auxiliaryTableSchema.fields
-        .find(field => resolver(field.name, key.name))
-        .getOrElse(
-          // Construction of [[auxiliaryTableSchema]] already enforces all of 
the user-specified
-          // keys are present, so if we don't find a key it is truly an 
internal error.
-          throw SparkException.internalError(
-            s"Key column '${key.name}' was not found in the AutoCDC auxiliary 
table schema."
-          )
-        )
-    }
+    // Resolve the flow-declared (expected) keys from 
[[auxiliaryTableSchema]]. We deliberately
+    // do not look them up in [[existingAuxSchema]] - that's the recorded 
side, and conflating
+    // the two sides would mask drift. See [[expectedAuxiliaryKeyFields]].
+    val expectedKeyFields: Seq[StructField] = expectedAuxiliaryKeyFields
     val recordedKeyNames = parseRecordedKeyColumnNames(existingAuxTable, 
auxIdent)
     val recordedKeyFields: Seq[StructField] = recordedKeyNames.map { name =>
       existingAuxSchema.fields
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala
index 5ebdb4b4c86d..302ff789c12d 100644
--- 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala
@@ -21,6 +21,7 @@ import org.scalatest.{BeforeAndAfterEach, Suite}
 
 import org.apache.spark.SparkThrowable
 import org.apache.spark.sql.{Column, Row}
+import org.apache.spark.sql.classic.DataFrame
 import 
org.apache.spark.sql.connector.catalog.SharedTablesInMemoryRowLevelOperationTableCatalog
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.pipelines.autocdc.{
@@ -207,6 +208,35 @@ trait AutoCdcGraphExecutionTestMixin extends 
BeforeAndAfterEach {
     )
   )
 
+  /**
+   * Build a single-flow AutoCDC pipeline: a [[TestGraphRegistrationContext]] 
that registers
+   * `target` under [[catalog]].[[namespace]] and one [[autoCdcFlow]] writing 
into it from
+   * `sourceDf`. Covers the common single-table/single-flow shape used across 
the AutoCDC E2E
+   * suites; tests that need multiple flows or non-AutoCDC datasets build the 
context inline.
+   */
+  protected def singleAutoCdcFlowPipeline(
+      flowName: String,
+      target: String,
+      sourceDf: DataFrame,
+      keys: Seq[String],
+      sequencing: Column,
+      columnSelection: Option[ColumnSelection] = None,
+      deleteCondition: Option[Column] = None,
+      scdType: ScdType = ScdType.Type1): TestGraphRegistrationContext =
+    new TestGraphRegistrationContext(spark) {
+      registerTable(target, catalog = Some(catalog), database = 
Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = flowName,
+        target = target,
+        query = dfFlowFunc(sourceDf),
+        keys = keys,
+        sequencing = sequencing,
+        columnSelection = columnSelection,
+        deleteCondition = deleteCondition,
+        scdType = scdType
+      ))
+    }
+
   /** Build a target row's `_cdc_metadata` struct value. */
   protected def cdcMeta(deleteSeq: Option[Long], upsertSeq: Option[Long]): Row 
=
     Row(deleteSeq.orNull, upsertSeq.orNull)
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala
index 5a9f6cb6710b..3453235cbae8 100644
--- 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala
@@ -52,18 +52,12 @@ class AutoCdcScd1AuxiliaryTableDurabilitySuite
     // resume cleanly.
     val changeDataFeedStream = MemoryStream[(Int, String, Long)]
     def buildGraphRegistrationContext(): TestGraphRegistrationContext =
-      new TestGraphRegistrationContext(spark) {
-        registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
-        registerFlow(autoCdcFlow(
-          name = "auto_cdc_flow",
-          target = "target",
-          query = dfFlowFunc(
-            changeDataFeedStream.toDF().toDF("id", "name", "version")
-          ),
-          keys = Seq("id"),
-          sequencing = functions.col("version")
-        ))
-      }
+      singleAutoCdcFlowPipeline(
+        flowName = "auto_cdc_flow",
+        target = "target",
+        sourceDf = changeDataFeedStream.toDF().toDF("id", "name", "version"),
+        keys = Seq("id"),
+        sequencing = functions.col("version"))
 
     // Run #1: insert id=1 at seq=1.
     changeDataFeedStream.addData((1, "alice", 1L))
@@ -98,20 +92,18 @@ class AutoCdcScd1AuxiliaryTableDurabilitySuite
 
     // Single MemoryStream reused across both runs so the streaming checkpoint 
can resume.
     val stream = MemoryStream[(Int, String, Long, Boolean)]
-    def buildCtx(): TestGraphRegistrationContext = new 
TestGraphRegistrationContext(spark) {
-      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
-      registerFlow(autoCdcFlow(
-        name = "auto_cdc_flow",
+    def buildCtx(): TestGraphRegistrationContext =
+      singleAutoCdcFlowPipeline(
+        flowName = "auto_cdc_flow",
         target = "target",
-        query = dfFlowFunc(stream.toDF().toDF("id", "name", "version", 
"is_delete")),
+        sourceDf = stream.toDF().toDF("id", "name", "version", "is_delete"),
         keys = Seq("id"),
         sequencing = functions.col("version"),
         deleteCondition = Some(functions.col("is_delete") === true),
         columnSelection = Some(ColumnSelection.ExcludeColumns(
           Seq(UnqualifiedColumnName("is_delete"))
         ))
-      ))
-    }
+      )
 
     // Run #1: delete id=1 at seq=10. Auxiliary table records seq=10 as the 
watermark.
     stream.addData((1, "alice", 10L, true))
@@ -141,17 +133,12 @@ class AutoCdcScd1AuxiliaryTableDurabilitySuite
 
     val stream = MemoryStream[(String, Int, Long)]
     stream.addData(("alice", 1, 1L))
-    val ctx = new TestGraphRegistrationContext(spark) {
-      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
-      registerFlow(autoCdcFlow(
-        name = "auto_cdc_flow",
-        target = "target",
-        query = dfFlowFunc(stream.toDF().toDF("name", "id", "version")),
-        keys = Seq("id"),
-        sequencing = functions.col("version")
-      ))
-    }
-    runPipeline(ctx)
+    runPipeline(singleAutoCdcFlowPipeline(
+      flowName = "auto_cdc_flow",
+      target = "target",
+      sourceDf = stream.toDF().toDF("name", "id", "version"),
+      keys = Seq("id"),
+      sequencing = functions.col("version")))
 
     val auxSchema = spark.table(auxTableNameFor("target")).schema
 
@@ -181,17 +168,12 @@ class AutoCdcScd1AuxiliaryTableDurabilitySuite
 
     val stream = MemoryStream[(String, Int, String, Long)]
     stream.addData(("v", 1, "us", 1L))
-    val ctx = new TestGraphRegistrationContext(spark) {
-      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
-      registerFlow(autoCdcFlow(
-        name = "auto_cdc_flow",
-        target = "target",
-        query = dfFlowFunc(stream.toDF().toDF("value", "id", "region", 
"version")),
-        keys = Seq("region", "id"),
-        sequencing = functions.col("version")
-      ))
-    }
-    runPipeline(ctx)
+    runPipeline(singleAutoCdcFlowPipeline(
+      flowName = "auto_cdc_flow",
+      target = "target",
+      sourceDf = stream.toDF().toDF("value", "id", "region", "version"),
+      keys = Seq("region", "id"),
+      sequencing = functions.col("version")))
 
     val auxSchema = spark.table(auxTableNameFor("target")).schema
     assert(auxSchema.fieldNames.toSeq ==
@@ -211,16 +193,13 @@ class AutoCdcScd1AuxiliaryTableDurabilitySuite
 
     // Single MemoryStream reused across both runs so the streaming checkpoint 
can resume.
     val stream = MemoryStream[(Int, Long)]
-    def buildCtx(): TestGraphRegistrationContext = new 
TestGraphRegistrationContext(spark) {
-      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
-      registerFlow(autoCdcFlow(
-        name = "auto_cdc_flow",
+    def buildCtx(): TestGraphRegistrationContext =
+      singleAutoCdcFlowPipeline(
+        flowName = "auto_cdc_flow",
         target = "target",
-        query = dfFlowFunc(stream.toDF().toDF("id", "version")),
+        sourceDf = stream.toDF().toDF("id", "version"),
         keys = Seq("id"),
-        sequencing = functions.col("version")
-      ))
-    }
+        sequencing = functions.col("version"))
 
     stream.addData((1, 1L))
     runPipeline(buildCtx())
@@ -276,18 +255,13 @@ class AutoCdcScd1AuxiliaryTableDurabilitySuite
 
     // Single MemoryStream reused across both runs so the streaming checkpoint 
can resume.
     val stream = MemoryStream[(String, String, String, String, Long)]
-    def buildCtx(): TestGraphRegistrationContext = new 
TestGraphRegistrationContext(spark) {
-      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
-      registerFlow(autoCdcFlow(
-        name = "auto_cdc_flow",
+    def buildCtx(): TestGraphRegistrationContext =
+      singleAutoCdcFlowPipeline(
+        flowName = "auto_cdc_flow",
         target = "target",
-        query = dfFlowFunc(
-          stream.toDF().toDF((keyNames :+ "version"): _*)
-        ),
+        sourceDf = stream.toDF().toDF((keyNames :+ "version"): _*),
         keys = backtickQuotedKeys,
-        sequencing = functions.col("version")
-      ))
-    }
+        sequencing = functions.col("version"))
 
     // Run #1: a single insert with arbitrary non-empty key values.
     stream.addData(("v1", "v2", "v3", "v4", 1L))
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1KeyDriftSuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1KeyDriftSuite.scala
index 066d8afd5342..fc7706c84e3e 100644
--- 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1KeyDriftSuite.scala
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1KeyDriftSuite.scala
@@ -17,11 +17,12 @@
 
 package org.apache.spark.sql.pipelines.graph
 
+import org.apache.spark.sql.classic.DataFrame
 import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
-import org.apache.spark.sql.functions
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.pipelines.utils.{ExecutionTest, 
TestGraphRegistrationContext}
 import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.MetadataBuilder
 
 /**
  * End-to-end tests covering AutoCDC SCD1 key-drift validation: the AutoCDC 
flow's declared
@@ -39,11 +40,10 @@ class AutoCdcScd1KeyDriftSuite
     with SharedSparkSession
     with AutoCdcGraphExecutionTestMixin {
 
+  import testImplicits._
+
   test("a pipeline execution that adds a key column to an existing AutoCDC 
flow triggers " +
     "KEY_SCHEMA_DRIFT") {
-    val session = spark
-    import session.implicits._
-
     // Target table carries both candidate key columns up-front so only the 
AutoCDC `keys`
     // declaration differs between the two pipelines.
     spark.sql(
@@ -54,31 +54,13 @@ class AutoCdcScd1KeyDriftSuite
     // Pipeline #1 declares one key (`id`). Aux table is created with schema 
(id, _cdc_metadata).
     val stream1 = MemoryStream[(Int, String, Long)]
     stream1.addData((1, "us", 1L))
-    val ctx1 = new TestGraphRegistrationContext(spark) {
-      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
-      registerFlow(autoCdcFlow(
-        name = "flow_v1",
-        target = "target",
-        query = dfFlowFunc(stream1.toDF().toDF("id", "region", "version")),
-        keys = Seq("id"),
-        sequencing = functions.col("version")
-      ))
-    }
-    runPipeline(ctx1)
+    runPipeline(buildPipeline("flow_v1", stream1.toDF().toDF("id", "region", 
"version"), Seq("id")))
 
     // Pipeline #2 declares two keys (`region` + `id`) - arity drift.
     val stream2 = MemoryStream[(Int, String, Long)]
     stream2.addData((1, "us", 2L))
-    val ctx2 = new TestGraphRegistrationContext(spark) {
-      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
-      registerFlow(autoCdcFlow(
-        name = "flow_v2",
-        target = "target",
-        query = dfFlowFunc(stream2.toDF().toDF("id", "region", "version")),
-        keys = Seq("region", "id"),
-        sequencing = functions.col("version")
-      ))
-    }
+    val ctx2 = buildPipeline(
+      "flow_v2", stream2.toDF().toDF("id", "region", "version"), Seq("region", 
"id"))
 
     val ex = intercept[RuntimeException] { runPipeline(ctx2) }
     checkErrorInPipelineFailure(
@@ -100,9 +82,6 @@ class AutoCdcScd1KeyDriftSuite
 
   test("a pipeline execution that drops a key column from an existing AutoCDC 
flow triggers " +
     "KEY_SCHEMA_DRIFT") {
-    val session = spark
-    import session.implicits._
-
     spark.sql(
       s"CREATE TABLE $catalog.$namespace.target " +
       s"(region STRING NOT NULL, id INT NOT NULL, version BIGINT NOT NULL, 
$cdcMetadataDdl)"
@@ -112,31 +91,13 @@ class AutoCdcScd1KeyDriftSuite
     // would slip through with `id` silently matching at position 0 of the 
recorded schema.
     val stream1 = MemoryStream[(String, Int, Long)]
     stream1.addData(("us", 1, 1L))
-    val ctx1 = new TestGraphRegistrationContext(spark) {
-      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
-      registerFlow(autoCdcFlow(
-        name = "flow_v1",
-        target = "target",
-        query = dfFlowFunc(stream1.toDF().toDF("region", "id", "version")),
-        keys = Seq("region", "id"),
-        sequencing = functions.col("version")
-      ))
-    }
-    runPipeline(ctx1)
+    runPipeline(buildPipeline(
+      "flow_v1", stream1.toDF().toDF("region", "id", "version"), Seq("region", 
"id")))
 
     // Pipeline #2 declares only [id] - arity drift.
     val stream2 = MemoryStream[(String, Int, Long)]
     stream2.addData(("us", 1, 2L))
-    val ctx2 = new TestGraphRegistrationContext(spark) {
-      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
-      registerFlow(autoCdcFlow(
-        name = "flow_v2",
-        target = "target",
-        query = dfFlowFunc(stream2.toDF().toDF("region", "id", "version")),
-        keys = Seq("id"),
-        sequencing = functions.col("version")
-      ))
-    }
+    val ctx2 = buildPipeline("flow_v2", stream2.toDF().toDF("region", "id", 
"version"), Seq("id"))
 
     val ex = intercept[RuntimeException] { runPipeline(ctx2) }
     checkErrorInPipelineFailure(
@@ -157,9 +118,6 @@ class AutoCdcScd1KeyDriftSuite
 
   test("a pipeline execution that swaps a key in an existing AutoCDC flow for 
a different name " +
     "(same arity) triggers KEY_SCHEMA_DRIFT") {
-    val session = spark
-    import session.implicits._
-
     spark.sql(
       s"CREATE TABLE $catalog.$namespace.target " +
       s"(id INT NOT NULL, region STRING NOT NULL, country STRING NOT NULL, " +
@@ -169,33 +127,16 @@ class AutoCdcScd1KeyDriftSuite
     // Pipeline #1 declares [id, region].
     val stream1 = MemoryStream[(Int, String, String, Long)]
     stream1.addData((1, "us", "USA", 1L))
-    val ctx1 = new TestGraphRegistrationContext(spark) {
-      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
-      registerFlow(autoCdcFlow(
-        name = "flow_v1",
-        target = "target",
-        query = dfFlowFunc(stream1.toDF().toDF("id", "region", "country", 
"version")),
-        keys = Seq("id", "region"),
-        sequencing = functions.col("version")
-      ))
-    }
-    runPipeline(ctx1)
+    runPipeline(buildPipeline(
+      "flow_v1", stream1.toDF().toDF("id", "region", "country", "version"), 
Seq("id", "region")))
 
     // Pipeline #2 declares [id, country] - same arity, different key set. An 
arity-only check
     // would silently match `id` at position 0 and the swapped 
`region`/`country` would slip
     // through; the by-name set comparison must catch it.
     val stream2 = MemoryStream[(Int, String, String, Long)]
     stream2.addData((1, "us", "USA", 2L))
-    val ctx2 = new TestGraphRegistrationContext(spark) {
-      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
-      registerFlow(autoCdcFlow(
-        name = "flow_v2",
-        target = "target",
-        query = dfFlowFunc(stream2.toDF().toDF("id", "region", "country", 
"version")),
-        keys = Seq("id", "country"),
-        sequencing = functions.col("version")
-      ))
-    }
+    val ctx2 = buildPipeline(
+      "flow_v2", stream2.toDF().toDF("id", "region", "country", "version"), 
Seq("id", "country"))
 
     val ex = intercept[RuntimeException] { runPipeline(ctx2) }
     checkErrorInPipelineFailure(
@@ -225,8 +166,6 @@ class AutoCdcScd1KeyDriftSuite
       s"""TBLPROPERTIES ('${AutoCdcAuxiliaryTable.keyColumnNamesProperty}' = 
'["id"]')"""
     )
 
-    val session = spark
-    import session.implicits._
     val stream = MemoryStream[(Int, Long)]
     stream.addData((1, 1L))
     val ctx = buildPipeline("flow", stream.toDF().toDF("id", "version"), 
Seq("id"))
@@ -247,9 +186,6 @@ class AutoCdcScd1KeyDriftSuite
   }
 
   test("a composite key reorder ([a,b] -> [b,a]) does NOT trigger drift 
validation") {
-    val session = spark
-    import session.implicits._
-
     spark.sql(
       s"CREATE TABLE $catalog.$namespace.target " +
       s"(a INT NOT NULL, b STRING NOT NULL, version BIGINT NOT NULL, 
$cdcMetadataDdl)"
@@ -261,39 +197,16 @@ class AutoCdcScd1KeyDriftSuite
     // columns and their dataTypes.
     val stream1 = MemoryStream[(Int, String, Long)]
     stream1.addData((1, "x", 1L))
-    val ctx1 = new TestGraphRegistrationContext(spark) {
-      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
-      registerFlow(autoCdcFlow(
-        name = "flow_v1",
-        target = "target",
-        query = dfFlowFunc(stream1.toDF().toDF("a", "b", "version")),
-        keys = Seq("a", "b"),
-        sequencing = functions.col("version")
-      ))
-    }
-    runPipeline(ctx1)
+    runPipeline(buildPipeline("flow_v1", stream1.toDF().toDF("a", "b", 
"version"), Seq("a", "b")))
 
     // Pipeline #2 declares the same key set in the reversed order [b, a]. 
Must NOT throw.
     val stream2 = MemoryStream[(Int, String, Long)]
     stream2.addData((2, "y", 1L))
-    val ctx2 = new TestGraphRegistrationContext(spark) {
-      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
-      registerFlow(autoCdcFlow(
-        name = "flow_v2",
-        target = "target",
-        query = dfFlowFunc(stream2.toDF().toDF("a", "b", "version")),
-        keys = Seq("b", "a"),
-        sequencing = functions.col("version")
-      ))
-    }
-    runPipeline(ctx2)
+    runPipeline(buildPipeline("flow_v2", stream2.toDF().toDF("a", "b", 
"version"), Seq("b", "a")))
   }
 
   test("a pipeline execution that changes a key column's nullability or 
metadata in an " +
     "existing AutoCDC flow does NOT trigger drift") {
-    val session = spark
-    import session.implicits._
-
     // Drift validation compares (name, dataType) pairs as a set. Nullability 
and column
     // metadata are part of [[StructField]] but not part of [[DataType]], so 
they do not gate
     // semantic equivalence: only the wire-format data type matters for merge 
correctness.
@@ -314,7 +227,7 @@ class AutoCdcScd1KeyDriftSuite
     val stream2 = MemoryStream[(Option[Int], Long)]
     stream2.addData((Some(2), 2L))
     val baseDf = stream2.toDF().toDF("id", "version")
-    val md = new org.apache.spark.sql.types.MetadataBuilder()
+    val md = new MetadataBuilder()
       .putString("description", "primary key")
       .build()
     val sourceDfWithMetadata = baseDf.select(baseDf("id").as("id", md), 
baseDf("version"))
@@ -323,9 +236,6 @@ class AutoCdcScd1KeyDriftSuite
 
   test("a pipeline execution that wraps an existing AutoCDC flow's key in 
backticks does NOT " +
     "trigger drift") {
-    val session = spark
-    import session.implicits._
-
     // Backticks are a SQL-parse syntactic device, not part of the identifier 
itself. A user
     // adding or removing backticks around the same logical column must NOT be 
detected as drift.
     spark.sql(
@@ -344,9 +254,6 @@ class AutoCdcScd1KeyDriftSuite
 
   test("a pipeline execution that drops backticks around an existing AutoCDC 
flow's " +
     "previously-backtick-quoted key does NOT trigger drift") {
-    val session = spark
-    import session.implicits._
-
     // The reverse direction of the previous test: drift validation must be 
backtick-invariant
     // on both the WRITE side (recorded property strips backticks when 
serializing the key
     // names in pipeline #1) and the READ side (resolver-aware lookup strips 
backticks when
@@ -367,9 +274,6 @@ class AutoCdcScd1KeyDriftSuite
 
   test("under spark.sql.caseSensitive = true, an AutoCDC flow whose key 
differs only in case " +
     "from the recorded key triggers KEY_SCHEMA_DRIFT") {
-    val session = spark
-    import session.implicits._
-
     // validateNoAutoCdcKeyDrift uses spark.sessionState.conf.resolver, so its 
behavior on
     // `Id` vs `id` flips with the session conf. Pin the case-sensitive 
direction: pipeline #1
     // seeds the aux table under the default resolver with recorded key 
`["id"]`, then
@@ -410,9 +314,6 @@ class AutoCdcScd1KeyDriftSuite
 
   test("under the default (case-insensitive) resolver, an AutoCDC flow whose 
key differs only " +
     "in case from the recorded key does NOT trigger drift") {
-    val session = spark
-    import session.implicits._
-
     // Pairs with the case-sensitive test above: same recorded key, but under 
the default
     // resolver the two identifiers are equivalent so drift validation must 
accept pipeline
     // #2. This pins the negative direction so a regression that accidentally 
hard-codes a
@@ -451,8 +352,6 @@ class AutoCdcScd1KeyDriftSuite
       s"CREATE TABLE ${auxTableNameFor("target")} (id INT NOT NULL, 
$cdcMetadataDdl)"
     )
 
-    val session = spark
-    import session.implicits._
     val stream = MemoryStream[(Int, Long)]
     stream.addData((1, 1L))
     val ctx = buildPipeline("flow", stream.toDF().toDF("id", "version"), 
Seq("id"))
@@ -486,8 +385,6 @@ class AutoCdcScd1KeyDriftSuite
       s"TBLPROPERTIES ('${AutoCdcAuxiliaryTable.keyColumnNamesProperty}' = 
'$malformedKeysArray')"
     )
 
-    val session = spark
-    import session.implicits._
     val stream = MemoryStream[(Int, Long)]
     stream.addData((1, 1L))
     val ctx = buildPipeline("flow", stream.toDF().toDF("id", "version"), 
Seq("id"))
@@ -523,8 +420,6 @@ class AutoCdcScd1KeyDriftSuite
       s"""TBLPROPERTIES ('${AutoCdcAuxiliaryTable.keyColumnNamesProperty}' = 
'["region"]')"""
     )
 
-    val session = spark
-    import session.implicits._
     val stream = MemoryStream[(Int, Long)]
     stream.addData((1, 1L))
     val ctx = buildPipeline("flow", stream.toDF().toDF("id", "version"), 
Seq("id"))
@@ -546,21 +441,17 @@ class AutoCdcScd1KeyDriftSuite
 
   /**
    * Build a single-flow pipeline targeting `cat.ns1.target` with the given 
source DF and key
-   * column list.
+   * column list. Thin wrapper over [[singleAutoCdcFlowPipeline]] since every 
drift test targets
+   * the same `target` table.
    */
   private def buildPipeline(
       flowName: String,
-      sourceDf: org.apache.spark.sql.classic.DataFrame,
-      keys: Seq[String]): TestGraphRegistrationContext = {
-    new TestGraphRegistrationContext(spark) {
-      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
-      registerFlow(autoCdcFlow(
-        name = flowName,
-        target = "target",
-        query = dfFlowFunc(sourceDf),
-        keys = keys,
-        sequencing = functions.col("version")
-      ))
-    }
-  }
+      sourceDf: DataFrame,
+      keys: Seq[String]): TestGraphRegistrationContext =
+    singleAutoCdcFlowPipeline(
+      flowName = flowName,
+      target = "target",
+      sourceDf = sourceDf,
+      keys = keys,
+      sequencing = $"version")
 }
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1MultiPipelineSuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1MultiPipelineSuite.scala
index 0d3f6e954df3..2100928bc68a 100644
--- 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1MultiPipelineSuite.scala
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1MultiPipelineSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.pipelines.graph
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
-import org.apache.spark.sql.functions
 import org.apache.spark.sql.pipelines.utils.{ExecutionTest, 
TestGraphRegistrationContext}
 import org.apache.spark.sql.test.SharedSparkSession
 
@@ -58,34 +57,24 @@ class AutoCdcScd1MultiPipelineSuite
     // cat.ns1.__spark_autocdc_aux_state_t_a must not affect pipeline #2's 
`t_b`.
     val streamA = MemoryStream[(Int, String, Long)]
     streamA.addData((1, "alice", 100L))
-    val ctxA = new TestGraphRegistrationContext(spark) {
-      registerTable("t_a", catalog = Some(catalog), database = Some(namespace))
-      registerFlow(autoCdcFlow(
-        name = "flow_a",
-        target = "t_a",
-        query = dfFlowFunc(streamA.toDF().toDF("id", "name", "version")),
-        keys = Seq("id"),
-        sequencing = functions.col("version")
-      ))
-    }
-    runPipeline(ctxA)
+    runPipeline(singleAutoCdcFlowPipeline(
+      flowName = "flow_a",
+      target = "t_a",
+      sourceDf = streamA.toDF().toDF("id", "name", "version"),
+      keys = Seq("id"),
+      sequencing = $"version"))
 
     // Pipeline #2 only knows about `t_b`. Uses a deliberately *lower* 
sequence to verify
     // the watermark from pipeline #1's auxiliary table (seq=100) does not 
leak into
     // pipeline #2.
     val streamB = MemoryStream[(Int, String, Long)]
     streamB.addData((9, "bob", 1L))
-    val ctxB = new TestGraphRegistrationContext(spark) {
-      registerTable("t_b", catalog = Some(catalog), database = Some(namespace))
-      registerFlow(autoCdcFlow(
-        name = "flow_b",
-        target = "t_b",
-        query = dfFlowFunc(streamB.toDF().toDF("id", "name", "version")),
-        keys = Seq("id"),
-        sequencing = functions.col("version")
-      ))
-    }
-    runPipeline(ctxB)
+    runPipeline(singleAutoCdcFlowPipeline(
+      flowName = "flow_b",
+      target = "t_b",
+      sourceDf = streamB.toDF().toDF("id", "name", "version"),
+      keys = Seq("id"),
+      sequencing = $"version"))
 
     checkAnswer(
       spark.table(s"$catalog.$namespace.t_a"),
@@ -113,17 +102,12 @@ class AutoCdcScd1MultiPipelineSuite
     )
     val stream = MemoryStream[(Int, String, Long)]
     stream.addData((1, "alice", 1L), (2, "bob", 1L))
-    val ctxWriter = new TestGraphRegistrationContext(spark) {
-      registerTable("src", catalog = Some(catalog), database = Some(namespace))
-      registerFlow(autoCdcFlow(
-        name = "writer",
-        target = "src",
-        query = dfFlowFunc(stream.toDF().toDF("id", "name", "version")),
-        keys = Seq("id"),
-        sequencing = functions.col("version")
-      ))
-    }
-    runPipeline(ctxWriter)
+    runPipeline(singleAutoCdcFlowPipeline(
+      flowName = "writer",
+      target = "src",
+      sourceDf = stream.toDF().toDF("id", "name", "version"),
+      keys = Seq("id"),
+      sequencing = $"version"))
 
     // Pipeline #2 is a regular materialized view that selects the user-data 
columns from
     // `src` (a different graph entirely). It must observe the merged AutoCDC 
rows and be
@@ -161,17 +145,12 @@ class AutoCdcScd1MultiPipelineSuite
     // Pipeline #1: inserts rows with id=1 and id=2 at version=1.
     val stream1 = MemoryStream[(Int, String, Long)]
     stream1.addData((1, "alice", 1L), (2, "bob", 1L))
-    val ctx1 = new TestGraphRegistrationContext(spark) {
-      registerTable("shared_target", catalog = Some(catalog), database = 
Some(namespace))
-      registerFlow(autoCdcFlow(
-        name = "flow_v1",
-        target = "shared_target",
-        query = dfFlowFunc(stream1.toDF().toDF("id", "name", "version")),
-        keys = Seq("id"),
-        sequencing = functions.col("version")
-      ))
-    }
-    runPipeline(ctx1)
+    runPipeline(singleAutoCdcFlowPipeline(
+      flowName = "flow_v1",
+      target = "shared_target",
+      sourceDf = stream1.toDF().toDF("id", "name", "version"),
+      keys = Seq("id"),
+      sequencing = $"version"))
 
     // Sanity-check pipeline #1's effect before pipeline #2 runs.
     checkAnswer(
@@ -186,17 +165,12 @@ class AutoCdcScd1MultiPipelineSuite
     // (new key). id=1 is untouched and must survive into the final target 
unchanged.
     val stream2 = MemoryStream[(Int, String, Long)]
     stream2.addData((2, "bob-v2", 2L), (3, "carol", 1L))
-    val ctx2 = new TestGraphRegistrationContext(spark) {
-      registerTable("shared_target", catalog = Some(catalog), database = 
Some(namespace))
-      registerFlow(autoCdcFlow(
-        name = "flow_v2",
-        target = "shared_target",
-        query = dfFlowFunc(stream2.toDF().toDF("id", "name", "version")),
-        keys = Seq("id"),
-        sequencing = functions.col("version")
-      ))
-    }
-    runPipeline(ctx2)
+    runPipeline(singleAutoCdcFlowPipeline(
+      flowName = "flow_v2",
+      target = "shared_target",
+      sourceDf = stream2.toDF().toDF("id", "name", "version"),
+      keys = Seq("id"),
+      sequencing = $"version"))
 
     // Final target: id=1 untouched (pipeline #1's state), id=2 updated by 
pipeline #2,
     // id=3 freshly inserted by pipeline #2.
@@ -229,16 +203,12 @@ class AutoCdcScd1MultiPipelineSuite
     // Pipeline #1: source DF schema is (id, name, version); inserts id=1 and 
id=2.
     val stream1 = MemoryStream[(Int, String, Long)]
     stream1.addData((1, "alice", 1L), (2, "bob", 1L))
-    val ctx1 = new TestGraphRegistrationContext(spark) {
-      registerTable("shared_target", catalog = Some(catalog), database = 
Some(namespace))
-      registerFlow(autoCdcFlow(
-        name = "flow_v1",
-        target = "shared_target",
-        query = dfFlowFunc(stream1.toDF().toDF("id", "name", "version")),
-        keys = Seq("id"),
-        sequencing = functions.col("version")
-      ))
-    }
+    val ctx1 = singleAutoCdcFlowPipeline(
+      flowName = "flow_v1",
+      target = "shared_target",
+      sourceDf = stream1.toDF().toDF("id", "name", "version"),
+      keys = Seq("id"),
+      sequencing = $"version")
     runPipeline(ctx1)
 
     // Sanity-check pipeline #1's state before schema evolution kicks in.
@@ -255,17 +225,12 @@ class AutoCdcScd1MultiPipelineSuite
     // is backfilled to NULL.
     val stream2 = MemoryStream[(Int, String, Option[Int], Long)]
     stream2.addData((2, "bob-v2", Some(25), 2L), (3, "carol", Some(30), 1L))
-    val ctx2 = new TestGraphRegistrationContext(spark) {
-      registerTable("shared_target", catalog = Some(catalog), database = 
Some(namespace))
-      registerFlow(autoCdcFlow(
-        name = "flow_v2",
-        target = "shared_target",
-        query = dfFlowFunc(stream2.toDF().toDF("id", "name", "age", 
"version")),
-        keys = Seq("id"),
-        sequencing = functions.col("version")
-      ))
-    }
-    runPipeline(ctx2)
+    runPipeline(singleAutoCdcFlowPipeline(
+      flowName = "flow_v2",
+      target = "shared_target",
+      sourceDf = stream2.toDF().toDF("id", "name", "age", "version"),
+      keys = Seq("id"),
+      sequencing = $"version"))
 
     checkAnswer(
       spark.table(s"$catalog.$namespace.shared_target"),
@@ -309,32 +274,23 @@ class AutoCdcScd1MultiPipelineSuite
     // (id, _cdc_metadata).
     val stream1 = MemoryStream[(Int, String, Long)]
     stream1.addData((1, "alice", 1L))
-    val ctx1 = new TestGraphRegistrationContext(spark) {
-      registerTable("shared_target", catalog = Some(catalog), database = 
Some(namespace))
-      registerFlow(autoCdcFlow(
-        name = "flow_v1",
-        target = "shared_target",
-        query = dfFlowFunc(stream1.toDF().toDF("id", "name", "version")),
-        keys = Seq("id"),
-        sequencing = functions.col("version")
-      ))
-    }
-    runPipeline(ctx1)
+    runPipeline(singleAutoCdcFlowPipeline(
+      flowName = "flow_v1",
+      target = "shared_target",
+      sourceDf = stream1.toDF().toDF("id", "name", "version"),
+      keys = Seq("id"),
+      sequencing = $"version"))
 
     // Pipeline #2: completely separate graph, but targets the same physical 
`shared_target`
     // table with `keys = Seq("name")`.
     val stream2 = MemoryStream[(Int, String, Long)]
     stream2.addData((2, "alice", 1L))
-    val ctx2 = new TestGraphRegistrationContext(spark) {
-      registerTable("shared_target", catalog = Some(catalog), database = 
Some(namespace))
-      registerFlow(autoCdcFlow(
-        name = "flow_v2",
-        target = "shared_target",
-        query = dfFlowFunc(stream2.toDF().toDF("id", "name", "version")),
-        keys = Seq("name"),
-        sequencing = functions.col("version")
-      ))
-    }
+    val ctx2 = singleAutoCdcFlowPipeline(
+      flowName = "flow_v2",
+      target = "shared_target",
+      sourceDf = stream2.toDF().toDF("id", "name", "version"),
+      keys = Seq("name"),
+      sequencing = $"version")
 
     val ex = intercept[RuntimeException] { runPipeline(ctx2) }
     checkErrorInPipelineFailure(
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1SchemaEvolutionSuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1SchemaEvolutionSuite.scala
index 2424dbdc4e05..b6c8f2179b7f 100644
--- 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1SchemaEvolutionSuite.scala
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1SchemaEvolutionSuite.scala
@@ -58,16 +58,13 @@ class AutoCdcScd1SchemaEvolutionSuite
     )
 
     val stream = MemoryStream[(Int, String, Option[String], Long)]
-    def buildCtx(): TestGraphRegistrationContext = new 
TestGraphRegistrationContext(spark) {
-      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
-      registerFlow(autoCdcFlow(
-        name = "auto_cdc_flow",
+    def buildCtx(): TestGraphRegistrationContext =
+      singleAutoCdcFlowPipeline(
+        flowName = "auto_cdc_flow",
         target = "target",
-        query = dfFlowFunc(stream.toDF().toDF("id", "name", "email", 
"version")),
+        sourceDf = stream.toDF().toDF("id", "name", "email", "version"),
         keys = Seq("id"),
-        sequencing = functions.col("version")
-      ))
-    }
+        sequencing = functions.col("version"))
 
     // Run #1: insert with NULL email.
     stream.addData((1, "alice", None, 1L))
@@ -101,31 +98,22 @@ class AutoCdcScd1SchemaEvolutionSuite
 
     val stream1 = MemoryStream[(Int, Int, Long)]
     stream1.addData((1, 30, 1L))
-    val ctx1 = new TestGraphRegistrationContext(spark) {
-      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
-      registerFlow(autoCdcFlow(
-        name = "auto_cdc_flow",
-        target = "target",
-        query = dfFlowFunc(stream1.toDF().toDF("id", "age", "version")),
-        keys = Seq("id"),
-        sequencing = functions.col("version")
-      ))
-    }
-    runPipeline(ctx1)
+    runPipeline(singleAutoCdcFlowPipeline(
+      flowName = "auto_cdc_flow",
+      target = "target",
+      sourceDf = stream1.toDF().toDF("id", "age", "version"),
+      keys = Seq("id"),
+      sequencing = functions.col("version")))
 
     // Run #2: widen `age` from Int to Long.
     val stream2 = MemoryStream[(Int, Long, Long)]
     stream2.addData((1, 31L, 2L))
-    val ctx2 = new TestGraphRegistrationContext(spark) {
-      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
-      registerFlow(autoCdcFlow(
-        name = "auto_cdc_flow",
-        target = "target",
-        query = dfFlowFunc(stream2.toDF().toDF("id", "age", "version")),
-        keys = Seq("id"),
-        sequencing = functions.col("version")
-      ))
-    }
+    val ctx2 = singleAutoCdcFlowPipeline(
+      flowName = "auto_cdc_flow",
+      target = "target",
+      sourceDf = stream2.toDF().toDF("id", "age", "version"),
+      keys = Seq("id"),
+      sequencing = functions.col("version"))
     val ex = intercept[RuntimeException] { runPipeline(ctx2) }
     checkErrorInPipelineFailure(
       failure = ex,
@@ -154,31 +142,22 @@ class AutoCdcScd1SchemaEvolutionSuite
 
     val stream1 = MemoryStream[(Int, Long, Long)]
     stream1.addData((1, 100L, 1L))
-    val ctx1 = new TestGraphRegistrationContext(spark) {
-      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
-      registerFlow(autoCdcFlow(
-        name = "auto_cdc_flow",
-        target = "target",
-        query = dfFlowFunc(stream1.toDF().toDF("id", "payload", "version")),
-        keys = Seq("id"),
-        sequencing = functions.col("version")
-      ))
-    }
-    runPipeline(ctx1)
+    runPipeline(singleAutoCdcFlowPipeline(
+      flowName = "auto_cdc_flow",
+      target = "target",
+      sourceDf = stream1.toDF().toDF("id", "payload", "version"),
+      keys = Seq("id"),
+      sequencing = functions.col("version")))
 
     // Run #2: narrow `payload` from Long (BIGINT) to Int (INT).
     val stream2 = MemoryStream[(Int, Int, Long)]
     stream2.addData((1, 5, 2L))
-    val ctx2 = new TestGraphRegistrationContext(spark) {
-      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
-      registerFlow(autoCdcFlow(
-        name = "auto_cdc_flow",
-        target = "target",
-        query = dfFlowFunc(stream2.toDF().toDF("id", "payload", "version")),
-        keys = Seq("id"),
-        sequencing = functions.col("version")
-      ))
-    }
+    val ctx2 = singleAutoCdcFlowPipeline(
+      flowName = "auto_cdc_flow",
+      target = "target",
+      sourceDf = stream2.toDF().toDF("id", "payload", "version"),
+      keys = Seq("id"),
+      sequencing = functions.col("version"))
 
     val ex = intercept[RuntimeException] { runPipeline(ctx2) }
     checkErrorInPipelineFailure(
@@ -209,19 +188,16 @@ class AutoCdcScd1SchemaEvolutionSuite
     // unchanged (only the downstream projection differs), so the source 
identity that the
     // OffsetSeqLog records is stable across runs.
     val stream = MemoryStream[(Int, String, Option[String], Long)]
-    def buildCtx(includeEmail: Boolean): TestGraphRegistrationContext =
-      new TestGraphRegistrationContext(spark) {
-        registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
-        val sourceDf = stream.toDF().toDF("id", "name", "email", "version")
-        val projectedDf = if (includeEmail) sourceDf else 
sourceDf.drop("email")
-        registerFlow(autoCdcFlow(
-          name = "auto_cdc_flow",
-          target = "target",
-          query = dfFlowFunc(projectedDf),
-          keys = Seq("id"),
-          sequencing = functions.col("version")
-        ))
-      }
+    def buildCtx(includeEmail: Boolean): TestGraphRegistrationContext = {
+      val sourceDf = stream.toDF().toDF("id", "name", "email", "version")
+      val projectedDf = if (includeEmail) sourceDf else sourceDf.drop("email")
+      singleAutoCdcFlowPipeline(
+        flowName = "auto_cdc_flow",
+        target = "target",
+        sourceDf = projectedDf,
+        keys = Seq("id"),
+        sequencing = functions.col("version"))
+    }
 
     // Run #1: source projects (id, name, version). Target schema is unchanged.
     stream.addData((1, "alice", None, 1L))
@@ -262,17 +238,13 @@ class AutoCdcScd1SchemaEvolutionSuite
 
     val stream = MemoryStream[(Int, String, String, Long)]
     def buildCtx(selection: Option[ColumnSelection]): 
TestGraphRegistrationContext =
-      new TestGraphRegistrationContext(spark) {
-        registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
-        registerFlow(autoCdcFlow(
-          name = "auto_cdc_flow",
-          target = "target",
-          query = dfFlowFunc(stream.toDF().toDF("id", "name", "email", 
"version")),
-          keys = Seq("id"),
-          sequencing = functions.col("version"),
-          columnSelection = selection
-        ))
-      }
+      singleAutoCdcFlowPipeline(
+        flowName = "auto_cdc_flow",
+        target = "target",
+        sourceDf = stream.toDF().toDF("id", "name", "email", "version"),
+        keys = Seq("id"),
+        sequencing = functions.col("version"),
+        columnSelection = selection)
 
     // Run #1: only (id, name, version) selected; `email` is dropped before 
the MERGE.
     stream.addData((1, "alice", "ignored", 1L))
@@ -312,17 +284,13 @@ class AutoCdcScd1SchemaEvolutionSuite
 
     val stream = MemoryStream[(Int, String, String, Long)]
     def buildCtx(selection: Option[ColumnSelection]): 
TestGraphRegistrationContext =
-      new TestGraphRegistrationContext(spark) {
-        registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
-        registerFlow(autoCdcFlow(
-          name = "auto_cdc_flow",
-          target = "target",
-          query = dfFlowFunc(stream.toDF().toDF("id", "name", "email", 
"version")),
-          keys = Seq("id"),
-          sequencing = functions.col("version"),
-          columnSelection = selection
-        ))
-      }
+      singleAutoCdcFlowPipeline(
+        flowName = "auto_cdc_flow",
+        target = "target",
+        sourceDf = stream.toDF().toDF("id", "name", "email", "version"),
+        keys = Seq("id"),
+        sequencing = functions.col("version"),
+        columnSelection = selection)
 
     // Run #1: include all columns; populate `email` for key=1.
     stream.addData((1, "alice", "[email protected]", 1L))
@@ -365,19 +333,16 @@ class AutoCdcScd1SchemaEvolutionSuite
     // Same `MemoryStream[(Int, String, Option[String], Long)]` shape across 
runs; runs
     // differ in whether `email` is kept in the projected source DF.
     val stream = MemoryStream[(Int, String, Option[String], Long)]
-    def buildCtx(includeEmail: Boolean): TestGraphRegistrationContext =
-      new TestGraphRegistrationContext(spark) {
-        registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
-        val sourceDf = stream.toDF().toDF("id", "name", "email", "version")
-        val projectedDf = if (includeEmail) sourceDf else 
sourceDf.drop("email")
-        registerFlow(autoCdcFlow(
-          name = "auto_cdc_flow",
-          target = "target",
-          query = dfFlowFunc(projectedDf),
-          keys = Seq("id"),
-          sequencing = functions.col("version")
-        ))
-      }
+    def buildCtx(includeEmail: Boolean): TestGraphRegistrationContext = {
+      val sourceDf = stream.toDF().toDF("id", "name", "email", "version")
+      val projectedDf = if (includeEmail) sourceDf else sourceDf.drop("email")
+      singleAutoCdcFlowPipeline(
+        flowName = "auto_cdc_flow",
+        target = "target",
+        sourceDf = projectedDf,
+        keys = Seq("id"),
+        sequencing = functions.col("version"))
+    }
 
     // Run #1: wide source DF (id, name, email, version). mergeSchemas appends 
`email` to
     // the target.
@@ -421,28 +386,25 @@ class AutoCdcScd1SchemaEvolutionSuite
     // shapes; the underlying tuple shape is unchanged so the streaming 
source's identity
     // is stable across runs.
     val stream = MemoryStream[(Int, Long, Int, Int, Int)]
-    def buildCtx(includeC: Boolean): TestGraphRegistrationContext =
-      new TestGraphRegistrationContext(spark) {
-        registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
-        val src = stream.toDF().toDF("key", "version", "a", "b_c", "b_d")
-        val inner = if (includeC) {
-          functions.struct(functions.col("b_c").as("c"), 
functions.col("b_d").as("d"))
-        } else {
-          functions.struct(functions.col("b_d").as("d"))
-        }
-        val projected = src.select(
-          functions.col("key"),
-          functions.col("version"),
-          functions.struct(functions.col("a"), inner.as("b")).as("value")
-        )
-        registerFlow(autoCdcFlow(
-          name = "auto_cdc_flow",
-          target = "target",
-          query = dfFlowFunc(projected),
-          keys = Seq("key"),
-          sequencing = functions.col("version")
-        ))
+    def buildCtx(includeC: Boolean): TestGraphRegistrationContext = {
+      val src = stream.toDF().toDF("key", "version", "a", "b_c", "b_d")
+      val inner = if (includeC) {
+        functions.struct(functions.col("b_c").as("c"), 
functions.col("b_d").as("d"))
+      } else {
+        functions.struct(functions.col("b_d").as("d"))
       }
+      val projected = src.select(
+        functions.col("key"),
+        functions.col("version"),
+        functions.struct(functions.col("a"), inner.as("b")).as("value")
+      )
+      singleAutoCdcFlowPipeline(
+        flowName = "auto_cdc_flow",
+        target = "target",
+        sourceDf = projected,
+        keys = Seq("key"),
+        sequencing = functions.col("version"))
+    }
 
     stream.addData((1, 1L, 1, 1, 1), (2, 1L, 2, 2, 2))
     runPipeline(buildCtx(includeC = true))
@@ -476,30 +438,27 @@ class AutoCdcScd1SchemaEvolutionSuite
     )
 
     val stream = MemoryStream[(Int, Long, Int, Int, Int)]
-    def buildCtx(includeD: Boolean): TestGraphRegistrationContext =
-      new TestGraphRegistrationContext(spark) {
-        registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
-        val src = stream.toDF().toDF("key", "version", "a", "b_c", "b_d")
-        val inner = if (includeD) {
-          functions.struct(functions.col("b_c").as("c"), 
functions.col("b_d").as("d"))
-        } else {
-          functions.struct(functions.col("b_c").as("c"))
-        }
-        val projected = src.select(
-          functions.col("key"),
-          functions.col("version"),
-          functions.array(
-            functions.struct(functions.col("a"), inner.as("b"))
-          ).as("vals")
-        )
-        registerFlow(autoCdcFlow(
-          name = "auto_cdc_flow",
-          target = "target",
-          query = dfFlowFunc(projected),
-          keys = Seq("key"),
-          sequencing = functions.col("version")
-        ))
+    def buildCtx(includeD: Boolean): TestGraphRegistrationContext = {
+      val src = stream.toDF().toDF("key", "version", "a", "b_c", "b_d")
+      val inner = if (includeD) {
+        functions.struct(functions.col("b_c").as("c"), 
functions.col("b_d").as("d"))
+      } else {
+        functions.struct(functions.col("b_c").as("c"))
       }
+      val projected = src.select(
+        functions.col("key"),
+        functions.col("version"),
+        functions.array(
+          functions.struct(functions.col("a"), inner.as("b"))
+        ).as("vals")
+      )
+      singleAutoCdcFlowPipeline(
+        flowName = "auto_cdc_flow",
+        target = "target",
+        sourceDf = projected,
+        keys = Seq("key"),
+        sequencing = functions.col("version"))
+    }
 
     stream.addData((1, 1L, 1, 1, 99))
     runPipeline(buildCtx(includeD = false))
@@ -537,30 +496,27 @@ class AutoCdcScd1SchemaEvolutionSuite
     )
 
     val stream = MemoryStream[(Int, Long, Int, Int, Int)]
-    def buildCtx(includeD: Boolean): TestGraphRegistrationContext =
-      new TestGraphRegistrationContext(spark) {
-        registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
-        val src = stream.toDF().toDF("key", "version", "a", "b_c", "b_d")
-        val inner = if (includeD) {
-          functions.struct(functions.col("b_c").as("c"), 
functions.col("b_d").as("d"))
-        } else {
-          functions.struct(functions.col("b_c").as("c"))
-        }
-        val projected = src.select(
-          functions.col("key"),
-          functions.col("version"),
-          functions.array(
-            functions.struct(functions.col("a"), inner.as("b"))
-          ).as("vals")
-        )
-        registerFlow(autoCdcFlow(
-          name = "auto_cdc_flow",
-          target = "target",
-          query = dfFlowFunc(projected),
-          keys = Seq("key"),
-          sequencing = functions.col("version")
-        ))
+    def buildCtx(includeD: Boolean): TestGraphRegistrationContext = {
+      val src = stream.toDF().toDF("key", "version", "a", "b_c", "b_d")
+      val inner = if (includeD) {
+        functions.struct(functions.col("b_c").as("c"), 
functions.col("b_d").as("d"))
+      } else {
+        functions.struct(functions.col("b_c").as("c"))
       }
+      val projected = src.select(
+        functions.col("key"),
+        functions.col("version"),
+        functions.array(
+          functions.struct(functions.col("a"), inner.as("b"))
+        ).as("vals")
+      )
+      singleAutoCdcFlowPipeline(
+        flowName = "auto_cdc_flow",
+        target = "target",
+        sourceDf = projected,
+        keys = Seq("key"),
+        sequencing = functions.col("version"))
+    }
 
     stream.addData((1, 1L, 1, 1, 1), (2, 1L, 2, 2, 2))
     runPipeline(buildCtx(includeD = true))
@@ -598,19 +554,15 @@ class AutoCdcScd1SchemaEvolutionSuite
 
       val stream = MemoryStream[(Int, Long, String)]
       stream.addData((1, 1L, "alice"))
-      val ctx = new TestGraphRegistrationContext(spark) {
-        registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
-        // Source DF emits `Value` (capital), differing only in case from the 
target's
-        // `value` column.
-        val df = stream.toDF().toDF("key", "version", "Value")
-        registerFlow(autoCdcFlow(
-          name = "auto_cdc_flow",
-          target = "target",
-          query = dfFlowFunc(df),
-          keys = Seq("key"),
-          sequencing = functions.col("version")
-        ))
-      }
+      // Source DF emits `Value` (capital), differing only in case from the 
target's
+      // `value` column.
+      val df = stream.toDF().toDF("key", "version", "Value")
+      val ctx = singleAutoCdcFlowPipeline(
+        flowName = "auto_cdc_flow",
+        target = "target",
+        sourceDf = df,
+        keys = Seq("key"),
+        sequencing = functions.col("version"))
 
       val ex = intercept[RuntimeException] { runPipeline(ctx) }
       // The exact `name` and `referenceNames` parameters depend on internal 
merge-plan
@@ -655,17 +607,12 @@ class AutoCdcScd1SchemaEvolutionSuite
 
     val stream = MemoryStream[(Int, String, Long)]
     stream.addData((1, "alice", 1L), (2, "bob", 1L))
-    val ctx = new TestGraphRegistrationContext(spark) {
-      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
-      registerFlow(autoCdcFlow(
-        name = "auto_cdc_flow",
-        target = "target",
-        query = dfFlowFunc(stream.toDF().toDF("id", "name", "version")),
-        keys = Seq("id"),
-        sequencing = functions.col("version")
-      ))
-    }
-    runPipeline(ctx)
+    runPipeline(singleAutoCdcFlowPipeline(
+      flowName = "auto_cdc_flow",
+      target = "target",
+      sourceDf = stream.toDF().toDF("id", "name", "version"),
+      keys = Seq("id"),
+      sequencing = functions.col("version")))
 
     checkAnswer(
       spark.table(s"$catalog.$namespace.target").select("id", "name", 
"version", "extra"),
@@ -691,31 +638,22 @@ class AutoCdcScd1SchemaEvolutionSuite
 
     val stream1 = MemoryStream[(Int, Long, Timestamp)]
     stream1.addData((1, 1L, Timestamp.valueOf("2024-01-01 10:00:00")))
-    val ctx1 = new TestGraphRegistrationContext(spark) {
-      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
-      registerFlow(autoCdcFlow(
-        name = "auto_cdc_flow",
-        target = "target",
-        query = dfFlowFunc(stream1.toDF().toDF("key", "version", "value")),
-        keys = Seq("key"),
-        sequencing = functions.col("version")
-      ))
-    }
-    runPipeline(ctx1)
+    runPipeline(singleAutoCdcFlowPipeline(
+      flowName = "auto_cdc_flow",
+      target = "target",
+      sourceDf = stream1.toDF().toDF("key", "version", "value"),
+      keys = Seq("key"),
+      sequencing = functions.col("version")))
 
     // Run #2 emits `value` as STRING. mergeSchemas rejects the type change.
     val stream2 = MemoryStream[(Int, Long, String)]
     stream2.addData((1, 2L, "2024-01-02 11:00:00"))
-    val ctx2 = new TestGraphRegistrationContext(spark) {
-      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
-      registerFlow(autoCdcFlow(
-        name = "auto_cdc_flow",
-        target = "target",
-        query = dfFlowFunc(stream2.toDF().toDF("key", "version", "value")),
-        keys = Seq("key"),
-        sequencing = functions.col("version")
-      ))
-    }
+    val ctx2 = singleAutoCdcFlowPipeline(
+      flowName = "auto_cdc_flow",
+      target = "target",
+      sourceDf = stream2.toDF().toDF("key", "version", "value"),
+      keys = Seq("key"),
+      sequencing = functions.col("version"))
 
     val ex = intercept[RuntimeException] { runPipeline(ctx2) }
     checkErrorInPipelineFailure(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to