This is an automated email from the ASF dual-hosted git repository.
cloud-fan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 232c309dec9e [SPARK-57113][SDP][FOLLOWUP] Cleanup AutoCDC Flow code
232c309dec9e is described below
commit 232c309dec9e72828d433ca3d410500b33cd230f
Author: Szehon Ho <[email protected]>
AuthorDate: Thu Jun 4 10:21:51 2026 +0800
[SPARK-57113][SDP][FOLLOWUP] Cleanup AutoCDC Flow code
### What changes were proposed in this pull request?
Follow-up to #56160 (SPARK-57113) addressing post-merge review comments and
reducing duplication in the AutoCDC flow code and its test suites. No behavior
change.
`sql/pipelines/.../graph/FlowExecution.scala`:
- Hoist the `org.json4s` imports out of `serializeKeyColumnNames` /
`parseKeyColumnNames` to the top of the file, per Spark's import conventions.
- Factor the duplicated AutoCDC key-field resolution shared by
`auxiliaryKeyColumnNames` and `validateNoAutoCdcKeyDrift` into a single
`expectedAuxiliaryKeyFields` helper.
- Import `scala.collection.mutable` instead of using a fully-qualified
inline reference.
Tests:
- Add a shared `singleAutoCdcFlowPipeline` helper to
`AutoCdcGraphExecutionTestMixin` and use it across the AutoCDC SCD1 E2E suites
(`AutoCdcScd1KeyDriftSuite`, `AutoCdcScd1MultiPipelineSuite`,
`AutoCdcScd1AuxiliaryTableDurabilitySuite`, `AutoCdcScd1SchemaEvolutionSuite`),
removing the repeated single-table/single-flow `TestGraphRegistrationContext`
registration boilerplate.
### Why are the changes needed?
Addresses non-blocking review feedback left on #56160 and reduces
duplication in the AutoCDC flow code and its tests, improving readability and
maintainability. The net diff removes ~300 lines.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pure refactor with no behavior change, covered by the existing AutoCDC
suites:
- `AutoCdcAuxiliaryTableSuite`
- `AutoCdcScd1KeyDriftSuite`
- `AutoCdcScd1MultiPipelineSuite`
- `AutoCdcScd1AuxiliaryTableDurabilitySuite`
- `AutoCdcScd1SchemaEvolutionSuite`
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Cursor (Claude Opus 4.8)
Closes #56255 from szehon-ho/spark-57113-follow-up.
Authored-by: Szehon Ho <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/pipelines/graph/FlowExecution.scala | 50 ++-
.../graph/AutoCdcGraphExecutionTestMixin.scala | 30 ++
.../AutoCdcScd1AuxiliaryTableDurabilitySuite.scala | 92 ++----
.../pipelines/graph/AutoCdcScd1KeyDriftSuite.scala | 163 ++--------
.../graph/AutoCdcScd1MultiPipelineSuite.scala | 152 ++++-----
.../graph/AutoCdcScd1SchemaEvolutionSuite.scala | 358 +++++++++------------
6 files changed, 316 insertions(+), 529 deletions(-)
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala
index 0d1c33be2172..c4cd358344ca 100644
---
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala
@@ -20,10 +20,14 @@ package org.apache.spark.sql.pipelines.graph
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.atomic.AtomicBoolean
+import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal
+import org.json4s.JsonAST.{JArray, JString}
+import org.json4s.jackson.JsonMethods.{compact, parse}
+
import org.apache.spark.SparkException
import org.apache.spark.internal.{Logging, LogKeys}
import org.apache.spark.sql.{AnalysisException, Dataset, Row}
@@ -346,8 +350,6 @@ object AutoCdcAuxiliaryTable {
* upstream.
*/
def serializeKeyColumnNames(names: Seq[String]): String = {
- import org.json4s.JsonAST.{JArray, JString}
- import org.json4s.jackson.JsonMethods.compact
compact(JArray(names.map(JString(_)).toList))
}
@@ -357,8 +359,6 @@ object AutoCdcAuxiliaryTable {
* upstream.
*/
def parseKeyColumnNames(raw: String): Option[Seq[String]] = {
- import org.json4s.JsonAST.{JArray, JString}
- import org.json4s.jackson.JsonMethods.parse
val parsed = try Some(parse(raw)) catch { case NonFatal(_) => None }
parsed.flatMap {
case JArray(elems) =>
@@ -406,7 +406,7 @@ trait AutoCdcMergeWriteBase {
val (catalog, v2Identifier) =
PipelinesCatalogUtils.resolveTableCatalog(spark, auxIdent)
if (!catalog.tableExists(v2Identifier)) {
- val properties = scala.collection.mutable.Map.empty[String, String]
+ val properties = mutable.Map.empty[String, String]
// Inherit the target's format so MERGE semantics line up. When
unspecified, omit the
// provider so the catalog falls back to its default.
@@ -440,18 +440,21 @@ trait AutoCdcMergeWriteBase {
}
/**
- * Returns the resolved AutoCDC key column names as they appear in the
auxiliary schema, in
- * `changeArgs.keys` declaration order.
+ * Resolves each AutoCDC key in `changeArgs.keys` to its [[StructField]] in
+ * [[auxiliaryTableSchema]], preserving `changeArgs.keys` declaration order.
This is the
+ * expected (flow-declared) side of drift validation, distinct from the keys
recorded on an
+ * existing auxiliary table.
+ *
+ * [[AutoCdcMergeFlow]] should have validated that all `changeArgs.keys`
exist in the deduced
+ * aux/target schemas by now, so a missing key is an internal error rather
than a user-facing
+ * condition.
*/
- private def auxiliaryKeyColumnNames: Seq[String] = {
+ private lazy val expectedAuxiliaryKeyFields: Seq[StructField] = {
val resolver = spark.sessionState.conf.resolver
changeArgs.keys.map { key =>
auxiliaryTableSchema.fields
.find(field => resolver(field.name, key.name))
- .map(_.name)
.getOrElse(
- // This should never happen at this point, as [[AutoCdcMergeFlow]]
should have validated
- // all changeArgs.keys exist in the deduced aux/target table schemas
by now.
throw SparkException.internalError(
s"AutoCDC key column '${key.name}' is missing from the auxiliary
table schema " +
s"for flow ${identifier.unquotedString} writing to target " +
@@ -461,6 +464,12 @@ trait AutoCdcMergeWriteBase {
}
}
+ /**
+ * Returns the resolved AutoCDC key column names as they appear in the
auxiliary schema, in
+ * `changeArgs.keys` declaration order.
+ */
+ private lazy val auxiliaryKeyColumnNames: Seq[String] =
expectedAuxiliaryKeyFields.map(_.name)
+
/**
* Validate that the target table's underlying connector implements
* [[SupportsRowLevelOperations]], which is the V2 connector contract for
MERGE/UPDATE/DELETE
@@ -512,21 +521,10 @@ trait AutoCdcMergeWriteBase {
val resolver = spark.sessionState.conf.resolver
val existingAuxSchema =
CatalogV2Util.v2ColumnsToStructType(existingAuxTable.columns())
- // The expected key fields are looked up in [[auxiliaryTableSchema]],
which by construction
- // contains every key column with its source-derived dataType. We
deliberately do not look
- // them up in [[existingAuxSchema]] - that's the recorded side, and
conflating the two
- // sides would mask drift.
- val expectedKeyFields: Seq[StructField] = changeArgs.keys.map { key =>
- auxiliaryTableSchema.fields
- .find(field => resolver(field.name, key.name))
- .getOrElse(
- // Construction of [[auxiliaryTableSchema]] already enforces all of
the user-specified
- // keys are present, so if we don't find a key it is truly an
internal error.
- throw SparkException.internalError(
- s"Key column '${key.name}' was not found in the AutoCDC auxiliary
table schema."
- )
- )
- }
+ // Resolve the flow-declared (expected) keys from
[[auxiliaryTableSchema]]. We deliberately
+ // do not look them up in [[existingAuxSchema]] - that's the recorded
side, and conflating
+ // the two sides would mask drift. See [[expectedAuxiliaryKeyFields]].
+ val expectedKeyFields: Seq[StructField] = expectedAuxiliaryKeyFields
val recordedKeyNames = parseRecordedKeyColumnNames(existingAuxTable,
auxIdent)
val recordedKeyFields: Seq[StructField] = recordedKeyNames.map { name =>
existingAuxSchema.fields
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala
index 5ebdb4b4c86d..302ff789c12d 100644
---
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala
@@ -21,6 +21,7 @@ import org.scalatest.{BeforeAndAfterEach, Suite}
import org.apache.spark.SparkThrowable
import org.apache.spark.sql.{Column, Row}
+import org.apache.spark.sql.classic.DataFrame
import
org.apache.spark.sql.connector.catalog.SharedTablesInMemoryRowLevelOperationTableCatalog
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.pipelines.autocdc.{
@@ -207,6 +208,35 @@ trait AutoCdcGraphExecutionTestMixin extends
BeforeAndAfterEach {
)
)
+ /**
+ * Build a single-flow AutoCDC pipeline: a [[TestGraphRegistrationContext]]
that registers
+ * `target` under [[catalog]].[[namespace]] and one [[autoCdcFlow]] writing
into it from
+ * `sourceDf`. Covers the common single-table/single-flow shape used across
the AutoCDC E2E
+ * suites; tests that need multiple flows or non-AutoCDC datasets build the
context inline.
+ */
+ protected def singleAutoCdcFlowPipeline(
+ flowName: String,
+ target: String,
+ sourceDf: DataFrame,
+ keys: Seq[String],
+ sequencing: Column,
+ columnSelection: Option[ColumnSelection] = None,
+ deleteCondition: Option[Column] = None,
+ scdType: ScdType = ScdType.Type1): TestGraphRegistrationContext =
+ new TestGraphRegistrationContext(spark) {
+ registerTable(target, catalog = Some(catalog), database =
Some(namespace))
+ registerFlow(autoCdcFlow(
+ name = flowName,
+ target = target,
+ query = dfFlowFunc(sourceDf),
+ keys = keys,
+ sequencing = sequencing,
+ columnSelection = columnSelection,
+ deleteCondition = deleteCondition,
+ scdType = scdType
+ ))
+ }
+
/** Build a target row's `_cdc_metadata` struct value. */
protected def cdcMeta(deleteSeq: Option[Long], upsertSeq: Option[Long]): Row
=
Row(deleteSeq.orNull, upsertSeq.orNull)
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala
index 5a9f6cb6710b..3453235cbae8 100644
---
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala
@@ -52,18 +52,12 @@ class AutoCdcScd1AuxiliaryTableDurabilitySuite
// resume cleanly.
val changeDataFeedStream = MemoryStream[(Int, String, Long)]
def buildGraphRegistrationContext(): TestGraphRegistrationContext =
- new TestGraphRegistrationContext(spark) {
- registerTable("target", catalog = Some(catalog), database =
Some(namespace))
- registerFlow(autoCdcFlow(
- name = "auto_cdc_flow",
- target = "target",
- query = dfFlowFunc(
- changeDataFeedStream.toDF().toDF("id", "name", "version")
- ),
- keys = Seq("id"),
- sequencing = functions.col("version")
- ))
- }
+ singleAutoCdcFlowPipeline(
+ flowName = "auto_cdc_flow",
+ target = "target",
+ sourceDf = changeDataFeedStream.toDF().toDF("id", "name", "version"),
+ keys = Seq("id"),
+ sequencing = functions.col("version"))
// Run #1: insert id=1 at seq=1.
changeDataFeedStream.addData((1, "alice", 1L))
@@ -98,20 +92,18 @@ class AutoCdcScd1AuxiliaryTableDurabilitySuite
// Single MemoryStream reused across both runs so the streaming checkpoint
can resume.
val stream = MemoryStream[(Int, String, Long, Boolean)]
- def buildCtx(): TestGraphRegistrationContext = new
TestGraphRegistrationContext(spark) {
- registerTable("target", catalog = Some(catalog), database =
Some(namespace))
- registerFlow(autoCdcFlow(
- name = "auto_cdc_flow",
+ def buildCtx(): TestGraphRegistrationContext =
+ singleAutoCdcFlowPipeline(
+ flowName = "auto_cdc_flow",
target = "target",
- query = dfFlowFunc(stream.toDF().toDF("id", "name", "version",
"is_delete")),
+ sourceDf = stream.toDF().toDF("id", "name", "version", "is_delete"),
keys = Seq("id"),
sequencing = functions.col("version"),
deleteCondition = Some(functions.col("is_delete") === true),
columnSelection = Some(ColumnSelection.ExcludeColumns(
Seq(UnqualifiedColumnName("is_delete"))
))
- ))
- }
+ )
// Run #1: delete id=1 at seq=10. Auxiliary table records seq=10 as the
watermark.
stream.addData((1, "alice", 10L, true))
@@ -141,17 +133,12 @@ class AutoCdcScd1AuxiliaryTableDurabilitySuite
val stream = MemoryStream[(String, Int, Long)]
stream.addData(("alice", 1, 1L))
- val ctx = new TestGraphRegistrationContext(spark) {
- registerTable("target", catalog = Some(catalog), database =
Some(namespace))
- registerFlow(autoCdcFlow(
- name = "auto_cdc_flow",
- target = "target",
- query = dfFlowFunc(stream.toDF().toDF("name", "id", "version")),
- keys = Seq("id"),
- sequencing = functions.col("version")
- ))
- }
- runPipeline(ctx)
+ runPipeline(singleAutoCdcFlowPipeline(
+ flowName = "auto_cdc_flow",
+ target = "target",
+ sourceDf = stream.toDF().toDF("name", "id", "version"),
+ keys = Seq("id"),
+ sequencing = functions.col("version")))
val auxSchema = spark.table(auxTableNameFor("target")).schema
@@ -181,17 +168,12 @@ class AutoCdcScd1AuxiliaryTableDurabilitySuite
val stream = MemoryStream[(String, Int, String, Long)]
stream.addData(("v", 1, "us", 1L))
- val ctx = new TestGraphRegistrationContext(spark) {
- registerTable("target", catalog = Some(catalog), database =
Some(namespace))
- registerFlow(autoCdcFlow(
- name = "auto_cdc_flow",
- target = "target",
- query = dfFlowFunc(stream.toDF().toDF("value", "id", "region",
"version")),
- keys = Seq("region", "id"),
- sequencing = functions.col("version")
- ))
- }
- runPipeline(ctx)
+ runPipeline(singleAutoCdcFlowPipeline(
+ flowName = "auto_cdc_flow",
+ target = "target",
+ sourceDf = stream.toDF().toDF("value", "id", "region", "version"),
+ keys = Seq("region", "id"),
+ sequencing = functions.col("version")))
val auxSchema = spark.table(auxTableNameFor("target")).schema
assert(auxSchema.fieldNames.toSeq ==
@@ -211,16 +193,13 @@ class AutoCdcScd1AuxiliaryTableDurabilitySuite
// Single MemoryStream reused across both runs so the streaming checkpoint
can resume.
val stream = MemoryStream[(Int, Long)]
- def buildCtx(): TestGraphRegistrationContext = new
TestGraphRegistrationContext(spark) {
- registerTable("target", catalog = Some(catalog), database =
Some(namespace))
- registerFlow(autoCdcFlow(
- name = "auto_cdc_flow",
+ def buildCtx(): TestGraphRegistrationContext =
+ singleAutoCdcFlowPipeline(
+ flowName = "auto_cdc_flow",
target = "target",
- query = dfFlowFunc(stream.toDF().toDF("id", "version")),
+ sourceDf = stream.toDF().toDF("id", "version"),
keys = Seq("id"),
- sequencing = functions.col("version")
- ))
- }
+ sequencing = functions.col("version"))
stream.addData((1, 1L))
runPipeline(buildCtx())
@@ -276,18 +255,13 @@ class AutoCdcScd1AuxiliaryTableDurabilitySuite
// Single MemoryStream reused across both runs so the streaming checkpoint
can resume.
val stream = MemoryStream[(String, String, String, String, Long)]
- def buildCtx(): TestGraphRegistrationContext = new
TestGraphRegistrationContext(spark) {
- registerTable("target", catalog = Some(catalog), database =
Some(namespace))
- registerFlow(autoCdcFlow(
- name = "auto_cdc_flow",
+ def buildCtx(): TestGraphRegistrationContext =
+ singleAutoCdcFlowPipeline(
+ flowName = "auto_cdc_flow",
target = "target",
- query = dfFlowFunc(
- stream.toDF().toDF((keyNames :+ "version"): _*)
- ),
+ sourceDf = stream.toDF().toDF((keyNames :+ "version"): _*),
keys = backtickQuotedKeys,
- sequencing = functions.col("version")
- ))
- }
+ sequencing = functions.col("version"))
// Run #1: a single insert with arbitrary non-empty key values.
stream.addData(("v1", "v2", "v3", "v4", 1L))
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1KeyDriftSuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1KeyDriftSuite.scala
index 066d8afd5342..fc7706c84e3e 100644
---
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1KeyDriftSuite.scala
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1KeyDriftSuite.scala
@@ -17,11 +17,12 @@
package org.apache.spark.sql.pipelines.graph
+import org.apache.spark.sql.classic.DataFrame
import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
-import org.apache.spark.sql.functions
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.pipelines.utils.{ExecutionTest,
TestGraphRegistrationContext}
import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.MetadataBuilder
/**
* End-to-end tests covering AutoCDC SCD1 key-drift validation: the AutoCDC
flow's declared
@@ -39,11 +40,10 @@ class AutoCdcScd1KeyDriftSuite
with SharedSparkSession
with AutoCdcGraphExecutionTestMixin {
+ import testImplicits._
+
test("a pipeline execution that adds a key column to an existing AutoCDC
flow triggers " +
"KEY_SCHEMA_DRIFT") {
- val session = spark
- import session.implicits._
-
// Target table carries both candidate key columns up-front so only the
AutoCDC `keys`
// declaration differs between the two pipelines.
spark.sql(
@@ -54,31 +54,13 @@ class AutoCdcScd1KeyDriftSuite
// Pipeline #1 declares one key (`id`). Aux table is created with schema
(id, _cdc_metadata).
val stream1 = MemoryStream[(Int, String, Long)]
stream1.addData((1, "us", 1L))
- val ctx1 = new TestGraphRegistrationContext(spark) {
- registerTable("target", catalog = Some(catalog), database =
Some(namespace))
- registerFlow(autoCdcFlow(
- name = "flow_v1",
- target = "target",
- query = dfFlowFunc(stream1.toDF().toDF("id", "region", "version")),
- keys = Seq("id"),
- sequencing = functions.col("version")
- ))
- }
- runPipeline(ctx1)
+ runPipeline(buildPipeline("flow_v1", stream1.toDF().toDF("id", "region",
"version"), Seq("id")))
// Pipeline #2 declares two keys (`region` + `id`) - arity drift.
val stream2 = MemoryStream[(Int, String, Long)]
stream2.addData((1, "us", 2L))
- val ctx2 = new TestGraphRegistrationContext(spark) {
- registerTable("target", catalog = Some(catalog), database =
Some(namespace))
- registerFlow(autoCdcFlow(
- name = "flow_v2",
- target = "target",
- query = dfFlowFunc(stream2.toDF().toDF("id", "region", "version")),
- keys = Seq("region", "id"),
- sequencing = functions.col("version")
- ))
- }
+ val ctx2 = buildPipeline(
+ "flow_v2", stream2.toDF().toDF("id", "region", "version"), Seq("region",
"id"))
val ex = intercept[RuntimeException] { runPipeline(ctx2) }
checkErrorInPipelineFailure(
@@ -100,9 +82,6 @@ class AutoCdcScd1KeyDriftSuite
test("a pipeline execution that drops a key column from an existing AutoCDC
flow triggers " +
"KEY_SCHEMA_DRIFT") {
- val session = spark
- import session.implicits._
-
spark.sql(
s"CREATE TABLE $catalog.$namespace.target " +
s"(region STRING NOT NULL, id INT NOT NULL, version BIGINT NOT NULL,
$cdcMetadataDdl)"
@@ -112,31 +91,13 @@ class AutoCdcScd1KeyDriftSuite
// would slip through with `id` silently matching at position 0 of the
recorded schema.
val stream1 = MemoryStream[(String, Int, Long)]
stream1.addData(("us", 1, 1L))
- val ctx1 = new TestGraphRegistrationContext(spark) {
- registerTable("target", catalog = Some(catalog), database =
Some(namespace))
- registerFlow(autoCdcFlow(
- name = "flow_v1",
- target = "target",
- query = dfFlowFunc(stream1.toDF().toDF("region", "id", "version")),
- keys = Seq("region", "id"),
- sequencing = functions.col("version")
- ))
- }
- runPipeline(ctx1)
+ runPipeline(buildPipeline(
+ "flow_v1", stream1.toDF().toDF("region", "id", "version"), Seq("region",
"id")))
// Pipeline #2 declares only [id] - arity drift.
val stream2 = MemoryStream[(String, Int, Long)]
stream2.addData(("us", 1, 2L))
- val ctx2 = new TestGraphRegistrationContext(spark) {
- registerTable("target", catalog = Some(catalog), database =
Some(namespace))
- registerFlow(autoCdcFlow(
- name = "flow_v2",
- target = "target",
- query = dfFlowFunc(stream2.toDF().toDF("region", "id", "version")),
- keys = Seq("id"),
- sequencing = functions.col("version")
- ))
- }
+ val ctx2 = buildPipeline("flow_v2", stream2.toDF().toDF("region", "id",
"version"), Seq("id"))
val ex = intercept[RuntimeException] { runPipeline(ctx2) }
checkErrorInPipelineFailure(
@@ -157,9 +118,6 @@ class AutoCdcScd1KeyDriftSuite
test("a pipeline execution that swaps a key in an existing AutoCDC flow for
a different name " +
"(same arity) triggers KEY_SCHEMA_DRIFT") {
- val session = spark
- import session.implicits._
-
spark.sql(
s"CREATE TABLE $catalog.$namespace.target " +
s"(id INT NOT NULL, region STRING NOT NULL, country STRING NOT NULL, " +
@@ -169,33 +127,16 @@ class AutoCdcScd1KeyDriftSuite
// Pipeline #1 declares [id, region].
val stream1 = MemoryStream[(Int, String, String, Long)]
stream1.addData((1, "us", "USA", 1L))
- val ctx1 = new TestGraphRegistrationContext(spark) {
- registerTable("target", catalog = Some(catalog), database =
Some(namespace))
- registerFlow(autoCdcFlow(
- name = "flow_v1",
- target = "target",
- query = dfFlowFunc(stream1.toDF().toDF("id", "region", "country",
"version")),
- keys = Seq("id", "region"),
- sequencing = functions.col("version")
- ))
- }
- runPipeline(ctx1)
+ runPipeline(buildPipeline(
+ "flow_v1", stream1.toDF().toDF("id", "region", "country", "version"),
Seq("id", "region")))
// Pipeline #2 declares [id, country] - same arity, different key set. An
arity-only check
// would silently match `id` at position 0 and the swapped
`region`/`country` would slip
// through; the by-name set comparison must catch it.
val stream2 = MemoryStream[(Int, String, String, Long)]
stream2.addData((1, "us", "USA", 2L))
- val ctx2 = new TestGraphRegistrationContext(spark) {
- registerTable("target", catalog = Some(catalog), database =
Some(namespace))
- registerFlow(autoCdcFlow(
- name = "flow_v2",
- target = "target",
- query = dfFlowFunc(stream2.toDF().toDF("id", "region", "country",
"version")),
- keys = Seq("id", "country"),
- sequencing = functions.col("version")
- ))
- }
+ val ctx2 = buildPipeline(
+ "flow_v2", stream2.toDF().toDF("id", "region", "country", "version"),
Seq("id", "country"))
val ex = intercept[RuntimeException] { runPipeline(ctx2) }
checkErrorInPipelineFailure(
@@ -225,8 +166,6 @@ class AutoCdcScd1KeyDriftSuite
s"""TBLPROPERTIES ('${AutoCdcAuxiliaryTable.keyColumnNamesProperty}' =
'["id"]')"""
)
- val session = spark
- import session.implicits._
val stream = MemoryStream[(Int, Long)]
stream.addData((1, 1L))
val ctx = buildPipeline("flow", stream.toDF().toDF("id", "version"),
Seq("id"))
@@ -247,9 +186,6 @@ class AutoCdcScd1KeyDriftSuite
}
test("a composite key reorder ([a,b] -> [b,a]) does NOT trigger drift
validation") {
- val session = spark
- import session.implicits._
-
spark.sql(
s"CREATE TABLE $catalog.$namespace.target " +
s"(a INT NOT NULL, b STRING NOT NULL, version BIGINT NOT NULL,
$cdcMetadataDdl)"
@@ -261,39 +197,16 @@ class AutoCdcScd1KeyDriftSuite
// columns and their dataTypes.
val stream1 = MemoryStream[(Int, String, Long)]
stream1.addData((1, "x", 1L))
- val ctx1 = new TestGraphRegistrationContext(spark) {
- registerTable("target", catalog = Some(catalog), database =
Some(namespace))
- registerFlow(autoCdcFlow(
- name = "flow_v1",
- target = "target",
- query = dfFlowFunc(stream1.toDF().toDF("a", "b", "version")),
- keys = Seq("a", "b"),
- sequencing = functions.col("version")
- ))
- }
- runPipeline(ctx1)
+ runPipeline(buildPipeline("flow_v1", stream1.toDF().toDF("a", "b",
"version"), Seq("a", "b")))
// Pipeline #2 declares the same key set in the reversed order [b, a].
Must NOT throw.
val stream2 = MemoryStream[(Int, String, Long)]
stream2.addData((2, "y", 1L))
- val ctx2 = new TestGraphRegistrationContext(spark) {
- registerTable("target", catalog = Some(catalog), database =
Some(namespace))
- registerFlow(autoCdcFlow(
- name = "flow_v2",
- target = "target",
- query = dfFlowFunc(stream2.toDF().toDF("a", "b", "version")),
- keys = Seq("b", "a"),
- sequencing = functions.col("version")
- ))
- }
- runPipeline(ctx2)
+ runPipeline(buildPipeline("flow_v2", stream2.toDF().toDF("a", "b",
"version"), Seq("b", "a")))
}
test("a pipeline execution that changes a key column's nullability or
metadata in an " +
"existing AutoCDC flow does NOT trigger drift") {
- val session = spark
- import session.implicits._
-
// Drift validation compares (name, dataType) pairs as a set. Nullability
and column
// metadata are part of [[StructField]] but not part of [[DataType]], so
they do not gate
// semantic equivalence: only the wire-format data type matters for merge
correctness.
@@ -314,7 +227,7 @@ class AutoCdcScd1KeyDriftSuite
val stream2 = MemoryStream[(Option[Int], Long)]
stream2.addData((Some(2), 2L))
val baseDf = stream2.toDF().toDF("id", "version")
- val md = new org.apache.spark.sql.types.MetadataBuilder()
+ val md = new MetadataBuilder()
.putString("description", "primary key")
.build()
val sourceDfWithMetadata = baseDf.select(baseDf("id").as("id", md),
baseDf("version"))
@@ -323,9 +236,6 @@ class AutoCdcScd1KeyDriftSuite
test("a pipeline execution that wraps an existing AutoCDC flow's key in
backticks does NOT " +
"trigger drift") {
- val session = spark
- import session.implicits._
-
// Backticks are a SQL-parse syntactic device, not part of the identifier
itself. A user
// adding or removing backticks around the same logical column must NOT be
detected as drift.
spark.sql(
@@ -344,9 +254,6 @@ class AutoCdcScd1KeyDriftSuite
test("a pipeline execution that drops backticks around an existing AutoCDC
flow's " +
"previously-backtick-quoted key does NOT trigger drift") {
- val session = spark
- import session.implicits._
-
// The reverse direction of the previous test: drift validation must be
backtick-invariant
// on both the WRITE side (recorded property strips backticks when
serializing the key
// names in pipeline #1) and the READ side (resolver-aware lookup strips
backticks when
@@ -367,9 +274,6 @@ class AutoCdcScd1KeyDriftSuite
test("under spark.sql.caseSensitive = true, an AutoCDC flow whose key
differs only in case " +
"from the recorded key triggers KEY_SCHEMA_DRIFT") {
- val session = spark
- import session.implicits._
-
// validateNoAutoCdcKeyDrift uses spark.sessionState.conf.resolver, so its
behavior on
// `Id` vs `id` flips with the session conf. Pin the case-sensitive
direction: pipeline #1
// seeds the aux table under the default resolver with recorded key
`["id"]`, then
@@ -410,9 +314,6 @@ class AutoCdcScd1KeyDriftSuite
test("under the default (case-insensitive) resolver, an AutoCDC flow whose
key differs only " +
"in case from the recorded key does NOT trigger drift") {
- val session = spark
- import session.implicits._
-
// Pairs with the case-sensitive test above: same recorded key, but under
the default
// resolver the two identifiers are equivalent so drift validation must
accept pipeline
// #2. This pins the negative direction so a regression that accidentally
hard-codes a
@@ -451,8 +352,6 @@ class AutoCdcScd1KeyDriftSuite
s"CREATE TABLE ${auxTableNameFor("target")} (id INT NOT NULL,
$cdcMetadataDdl)"
)
- val session = spark
- import session.implicits._
val stream = MemoryStream[(Int, Long)]
stream.addData((1, 1L))
val ctx = buildPipeline("flow", stream.toDF().toDF("id", "version"),
Seq("id"))
@@ -486,8 +385,6 @@ class AutoCdcScd1KeyDriftSuite
s"TBLPROPERTIES ('${AutoCdcAuxiliaryTable.keyColumnNamesProperty}' =
'$malformedKeysArray')"
)
- val session = spark
- import session.implicits._
val stream = MemoryStream[(Int, Long)]
stream.addData((1, 1L))
val ctx = buildPipeline("flow", stream.toDF().toDF("id", "version"),
Seq("id"))
@@ -523,8 +420,6 @@ class AutoCdcScd1KeyDriftSuite
s"""TBLPROPERTIES ('${AutoCdcAuxiliaryTable.keyColumnNamesProperty}' =
'["region"]')"""
)
- val session = spark
- import session.implicits._
val stream = MemoryStream[(Int, Long)]
stream.addData((1, 1L))
val ctx = buildPipeline("flow", stream.toDF().toDF("id", "version"),
Seq("id"))
@@ -546,21 +441,17 @@ class AutoCdcScd1KeyDriftSuite
/**
* Build a single-flow pipeline targeting `cat.ns1.target` with the given
source DF and key
- * column list.
+ * column list. Thin wrapper over [[singleAutoCdcFlowPipeline]] since every
drift test targets
+ * the same `target` table.
*/
private def buildPipeline(
flowName: String,
- sourceDf: org.apache.spark.sql.classic.DataFrame,
- keys: Seq[String]): TestGraphRegistrationContext = {
- new TestGraphRegistrationContext(spark) {
- registerTable("target", catalog = Some(catalog), database =
Some(namespace))
- registerFlow(autoCdcFlow(
- name = flowName,
- target = "target",
- query = dfFlowFunc(sourceDf),
- keys = keys,
- sequencing = functions.col("version")
- ))
- }
- }
+ sourceDf: DataFrame,
+ keys: Seq[String]): TestGraphRegistrationContext =
+ singleAutoCdcFlowPipeline(
+ flowName = flowName,
+ target = "target",
+ sourceDf = sourceDf,
+ keys = keys,
+ sequencing = $"version")
}
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1MultiPipelineSuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1MultiPipelineSuite.scala
index 0d3f6e954df3..2100928bc68a 100644
---
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1MultiPipelineSuite.scala
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1MultiPipelineSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.pipelines.graph
import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
-import org.apache.spark.sql.functions
import org.apache.spark.sql.pipelines.utils.{ExecutionTest,
TestGraphRegistrationContext}
import org.apache.spark.sql.test.SharedSparkSession
@@ -58,34 +57,24 @@ class AutoCdcScd1MultiPipelineSuite
// cat.ns1.__spark_autocdc_aux_state_t_a must not affect pipeline #2's
`t_b`.
val streamA = MemoryStream[(Int, String, Long)]
streamA.addData((1, "alice", 100L))
- val ctxA = new TestGraphRegistrationContext(spark) {
- registerTable("t_a", catalog = Some(catalog), database = Some(namespace))
- registerFlow(autoCdcFlow(
- name = "flow_a",
- target = "t_a",
- query = dfFlowFunc(streamA.toDF().toDF("id", "name", "version")),
- keys = Seq("id"),
- sequencing = functions.col("version")
- ))
- }
- runPipeline(ctxA)
+ runPipeline(singleAutoCdcFlowPipeline(
+ flowName = "flow_a",
+ target = "t_a",
+ sourceDf = streamA.toDF().toDF("id", "name", "version"),
+ keys = Seq("id"),
+ sequencing = $"version"))
// Pipeline #2 only knows about `t_b`. Uses a deliberately *lower*
sequence to verify
// the watermark from pipeline #1's auxiliary table (seq=100) does not
leak into
// pipeline #2.
val streamB = MemoryStream[(Int, String, Long)]
streamB.addData((9, "bob", 1L))
- val ctxB = new TestGraphRegistrationContext(spark) {
- registerTable("t_b", catalog = Some(catalog), database = Some(namespace))
- registerFlow(autoCdcFlow(
- name = "flow_b",
- target = "t_b",
- query = dfFlowFunc(streamB.toDF().toDF("id", "name", "version")),
- keys = Seq("id"),
- sequencing = functions.col("version")
- ))
- }
- runPipeline(ctxB)
+ runPipeline(singleAutoCdcFlowPipeline(
+ flowName = "flow_b",
+ target = "t_b",
+ sourceDf = streamB.toDF().toDF("id", "name", "version"),
+ keys = Seq("id"),
+ sequencing = $"version"))
checkAnswer(
spark.table(s"$catalog.$namespace.t_a"),
@@ -113,17 +102,12 @@ class AutoCdcScd1MultiPipelineSuite
)
val stream = MemoryStream[(Int, String, Long)]
stream.addData((1, "alice", 1L), (2, "bob", 1L))
- val ctxWriter = new TestGraphRegistrationContext(spark) {
- registerTable("src", catalog = Some(catalog), database = Some(namespace))
- registerFlow(autoCdcFlow(
- name = "writer",
- target = "src",
- query = dfFlowFunc(stream.toDF().toDF("id", "name", "version")),
- keys = Seq("id"),
- sequencing = functions.col("version")
- ))
- }
- runPipeline(ctxWriter)
+ runPipeline(singleAutoCdcFlowPipeline(
+ flowName = "writer",
+ target = "src",
+ sourceDf = stream.toDF().toDF("id", "name", "version"),
+ keys = Seq("id"),
+ sequencing = $"version"))
// Pipeline #2 is a regular materialized view that selects the user-data
columns from
// `src` (a different graph entirely). It must observe the merged AutoCDC
rows and be
@@ -161,17 +145,12 @@ class AutoCdcScd1MultiPipelineSuite
// Pipeline #1: inserts rows with id=1 and id=2 at version=1.
val stream1 = MemoryStream[(Int, String, Long)]
stream1.addData((1, "alice", 1L), (2, "bob", 1L))
- val ctx1 = new TestGraphRegistrationContext(spark) {
- registerTable("shared_target", catalog = Some(catalog), database =
Some(namespace))
- registerFlow(autoCdcFlow(
- name = "flow_v1",
- target = "shared_target",
- query = dfFlowFunc(stream1.toDF().toDF("id", "name", "version")),
- keys = Seq("id"),
- sequencing = functions.col("version")
- ))
- }
- runPipeline(ctx1)
+ runPipeline(singleAutoCdcFlowPipeline(
+ flowName = "flow_v1",
+ target = "shared_target",
+ sourceDf = stream1.toDF().toDF("id", "name", "version"),
+ keys = Seq("id"),
+ sequencing = $"version"))
// Sanity-check pipeline #1's effect before pipeline #2 runs.
checkAnswer(
@@ -186,17 +165,12 @@ class AutoCdcScd1MultiPipelineSuite
// (new key). id=1 is untouched and must survive into the final target
unchanged.
val stream2 = MemoryStream[(Int, String, Long)]
stream2.addData((2, "bob-v2", 2L), (3, "carol", 1L))
- val ctx2 = new TestGraphRegistrationContext(spark) {
- registerTable("shared_target", catalog = Some(catalog), database =
Some(namespace))
- registerFlow(autoCdcFlow(
- name = "flow_v2",
- target = "shared_target",
- query = dfFlowFunc(stream2.toDF().toDF("id", "name", "version")),
- keys = Seq("id"),
- sequencing = functions.col("version")
- ))
- }
- runPipeline(ctx2)
+ runPipeline(singleAutoCdcFlowPipeline(
+ flowName = "flow_v2",
+ target = "shared_target",
+ sourceDf = stream2.toDF().toDF("id", "name", "version"),
+ keys = Seq("id"),
+ sequencing = $"version"))
// Final target: id=1 untouched (pipeline #1's state), id=2 updated by
pipeline #2,
// id=3 freshly inserted by pipeline #2.
@@ -229,16 +203,12 @@ class AutoCdcScd1MultiPipelineSuite
// Pipeline #1: source DF schema is (id, name, version); inserts id=1 and
id=2.
val stream1 = MemoryStream[(Int, String, Long)]
stream1.addData((1, "alice", 1L), (2, "bob", 1L))
- val ctx1 = new TestGraphRegistrationContext(spark) {
- registerTable("shared_target", catalog = Some(catalog), database =
Some(namespace))
- registerFlow(autoCdcFlow(
- name = "flow_v1",
- target = "shared_target",
- query = dfFlowFunc(stream1.toDF().toDF("id", "name", "version")),
- keys = Seq("id"),
- sequencing = functions.col("version")
- ))
- }
+ val ctx1 = singleAutoCdcFlowPipeline(
+ flowName = "flow_v1",
+ target = "shared_target",
+ sourceDf = stream1.toDF().toDF("id", "name", "version"),
+ keys = Seq("id"),
+ sequencing = $"version")
runPipeline(ctx1)
// Sanity-check pipeline #1's state before schema evolution kicks in.
@@ -255,17 +225,12 @@ class AutoCdcScd1MultiPipelineSuite
// is backfilled to NULL.
val stream2 = MemoryStream[(Int, String, Option[Int], Long)]
stream2.addData((2, "bob-v2", Some(25), 2L), (3, "carol", Some(30), 1L))
- val ctx2 = new TestGraphRegistrationContext(spark) {
- registerTable("shared_target", catalog = Some(catalog), database =
Some(namespace))
- registerFlow(autoCdcFlow(
- name = "flow_v2",
- target = "shared_target",
- query = dfFlowFunc(stream2.toDF().toDF("id", "name", "age",
"version")),
- keys = Seq("id"),
- sequencing = functions.col("version")
- ))
- }
- runPipeline(ctx2)
+ runPipeline(singleAutoCdcFlowPipeline(
+ flowName = "flow_v2",
+ target = "shared_target",
+ sourceDf = stream2.toDF().toDF("id", "name", "age", "version"),
+ keys = Seq("id"),
+ sequencing = $"version"))
checkAnswer(
spark.table(s"$catalog.$namespace.shared_target"),
@@ -309,32 +274,23 @@ class AutoCdcScd1MultiPipelineSuite
// (id, _cdc_metadata).
val stream1 = MemoryStream[(Int, String, Long)]
stream1.addData((1, "alice", 1L))
- val ctx1 = new TestGraphRegistrationContext(spark) {
- registerTable("shared_target", catalog = Some(catalog), database =
Some(namespace))
- registerFlow(autoCdcFlow(
- name = "flow_v1",
- target = "shared_target",
- query = dfFlowFunc(stream1.toDF().toDF("id", "name", "version")),
- keys = Seq("id"),
- sequencing = functions.col("version")
- ))
- }
- runPipeline(ctx1)
+ runPipeline(singleAutoCdcFlowPipeline(
+ flowName = "flow_v1",
+ target = "shared_target",
+ sourceDf = stream1.toDF().toDF("id", "name", "version"),
+ keys = Seq("id"),
+ sequencing = $"version"))
// Pipeline #2: completely separate graph, but targets the same physical
`shared_target`
// table with `keys = Seq("name")`.
val stream2 = MemoryStream[(Int, String, Long)]
stream2.addData((2, "alice", 1L))
- val ctx2 = new TestGraphRegistrationContext(spark) {
- registerTable("shared_target", catalog = Some(catalog), database =
Some(namespace))
- registerFlow(autoCdcFlow(
- name = "flow_v2",
- target = "shared_target",
- query = dfFlowFunc(stream2.toDF().toDF("id", "name", "version")),
- keys = Seq("name"),
- sequencing = functions.col("version")
- ))
- }
+ val ctx2 = singleAutoCdcFlowPipeline(
+ flowName = "flow_v2",
+ target = "shared_target",
+ sourceDf = stream2.toDF().toDF("id", "name", "version"),
+ keys = Seq("name"),
+ sequencing = $"version")
val ex = intercept[RuntimeException] { runPipeline(ctx2) }
checkErrorInPipelineFailure(
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1SchemaEvolutionSuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1SchemaEvolutionSuite.scala
index 2424dbdc4e05..b6c8f2179b7f 100644
---
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1SchemaEvolutionSuite.scala
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1SchemaEvolutionSuite.scala
@@ -58,16 +58,13 @@ class AutoCdcScd1SchemaEvolutionSuite
)
val stream = MemoryStream[(Int, String, Option[String], Long)]
- def buildCtx(): TestGraphRegistrationContext = new
TestGraphRegistrationContext(spark) {
- registerTable("target", catalog = Some(catalog), database =
Some(namespace))
- registerFlow(autoCdcFlow(
- name = "auto_cdc_flow",
+ def buildCtx(): TestGraphRegistrationContext =
+ singleAutoCdcFlowPipeline(
+ flowName = "auto_cdc_flow",
target = "target",
- query = dfFlowFunc(stream.toDF().toDF("id", "name", "email",
"version")),
+ sourceDf = stream.toDF().toDF("id", "name", "email", "version"),
keys = Seq("id"),
- sequencing = functions.col("version")
- ))
- }
+ sequencing = functions.col("version"))
// Run #1: insert with NULL email.
stream.addData((1, "alice", None, 1L))
@@ -101,31 +98,22 @@ class AutoCdcScd1SchemaEvolutionSuite
val stream1 = MemoryStream[(Int, Int, Long)]
stream1.addData((1, 30, 1L))
- val ctx1 = new TestGraphRegistrationContext(spark) {
- registerTable("target", catalog = Some(catalog), database =
Some(namespace))
- registerFlow(autoCdcFlow(
- name = "auto_cdc_flow",
- target = "target",
- query = dfFlowFunc(stream1.toDF().toDF("id", "age", "version")),
- keys = Seq("id"),
- sequencing = functions.col("version")
- ))
- }
- runPipeline(ctx1)
+ runPipeline(singleAutoCdcFlowPipeline(
+ flowName = "auto_cdc_flow",
+ target = "target",
+ sourceDf = stream1.toDF().toDF("id", "age", "version"),
+ keys = Seq("id"),
+ sequencing = functions.col("version")))
// Run #2: widen `age` from Int to Long.
val stream2 = MemoryStream[(Int, Long, Long)]
stream2.addData((1, 31L, 2L))
- val ctx2 = new TestGraphRegistrationContext(spark) {
- registerTable("target", catalog = Some(catalog), database =
Some(namespace))
- registerFlow(autoCdcFlow(
- name = "auto_cdc_flow",
- target = "target",
- query = dfFlowFunc(stream2.toDF().toDF("id", "age", "version")),
- keys = Seq("id"),
- sequencing = functions.col("version")
- ))
- }
+ val ctx2 = singleAutoCdcFlowPipeline(
+ flowName = "auto_cdc_flow",
+ target = "target",
+ sourceDf = stream2.toDF().toDF("id", "age", "version"),
+ keys = Seq("id"),
+ sequencing = functions.col("version"))
val ex = intercept[RuntimeException] { runPipeline(ctx2) }
checkErrorInPipelineFailure(
failure = ex,
@@ -154,31 +142,22 @@ class AutoCdcScd1SchemaEvolutionSuite
val stream1 = MemoryStream[(Int, Long, Long)]
stream1.addData((1, 100L, 1L))
- val ctx1 = new TestGraphRegistrationContext(spark) {
- registerTable("target", catalog = Some(catalog), database =
Some(namespace))
- registerFlow(autoCdcFlow(
- name = "auto_cdc_flow",
- target = "target",
- query = dfFlowFunc(stream1.toDF().toDF("id", "payload", "version")),
- keys = Seq("id"),
- sequencing = functions.col("version")
- ))
- }
- runPipeline(ctx1)
+ runPipeline(singleAutoCdcFlowPipeline(
+ flowName = "auto_cdc_flow",
+ target = "target",
+ sourceDf = stream1.toDF().toDF("id", "payload", "version"),
+ keys = Seq("id"),
+ sequencing = functions.col("version")))
// Run #2: narrow `payload` from Long (BIGINT) to Int (INT).
val stream2 = MemoryStream[(Int, Int, Long)]
stream2.addData((1, 5, 2L))
- val ctx2 = new TestGraphRegistrationContext(spark) {
- registerTable("target", catalog = Some(catalog), database =
Some(namespace))
- registerFlow(autoCdcFlow(
- name = "auto_cdc_flow",
- target = "target",
- query = dfFlowFunc(stream2.toDF().toDF("id", "payload", "version")),
- keys = Seq("id"),
- sequencing = functions.col("version")
- ))
- }
+ val ctx2 = singleAutoCdcFlowPipeline(
+ flowName = "auto_cdc_flow",
+ target = "target",
+ sourceDf = stream2.toDF().toDF("id", "payload", "version"),
+ keys = Seq("id"),
+ sequencing = functions.col("version"))
val ex = intercept[RuntimeException] { runPipeline(ctx2) }
checkErrorInPipelineFailure(
@@ -209,19 +188,16 @@ class AutoCdcScd1SchemaEvolutionSuite
// unchanged (only the downstream projection differs), so the source
identity that the
// OffsetSeqLog records is stable across runs.
val stream = MemoryStream[(Int, String, Option[String], Long)]
- def buildCtx(includeEmail: Boolean): TestGraphRegistrationContext =
- new TestGraphRegistrationContext(spark) {
- registerTable("target", catalog = Some(catalog), database =
Some(namespace))
- val sourceDf = stream.toDF().toDF("id", "name", "email", "version")
- val projectedDf = if (includeEmail) sourceDf else
sourceDf.drop("email")
- registerFlow(autoCdcFlow(
- name = "auto_cdc_flow",
- target = "target",
- query = dfFlowFunc(projectedDf),
- keys = Seq("id"),
- sequencing = functions.col("version")
- ))
- }
+ def buildCtx(includeEmail: Boolean): TestGraphRegistrationContext = {
+ val sourceDf = stream.toDF().toDF("id", "name", "email", "version")
+ val projectedDf = if (includeEmail) sourceDf else sourceDf.drop("email")
+ singleAutoCdcFlowPipeline(
+ flowName = "auto_cdc_flow",
+ target = "target",
+ sourceDf = projectedDf,
+ keys = Seq("id"),
+ sequencing = functions.col("version"))
+ }
// Run #1: source projects (id, name, version). Target schema is unchanged.
stream.addData((1, "alice", None, 1L))
@@ -262,17 +238,13 @@ class AutoCdcScd1SchemaEvolutionSuite
val stream = MemoryStream[(Int, String, String, Long)]
def buildCtx(selection: Option[ColumnSelection]):
TestGraphRegistrationContext =
- new TestGraphRegistrationContext(spark) {
- registerTable("target", catalog = Some(catalog), database =
Some(namespace))
- registerFlow(autoCdcFlow(
- name = "auto_cdc_flow",
- target = "target",
- query = dfFlowFunc(stream.toDF().toDF("id", "name", "email",
"version")),
- keys = Seq("id"),
- sequencing = functions.col("version"),
- columnSelection = selection
- ))
- }
+ singleAutoCdcFlowPipeline(
+ flowName = "auto_cdc_flow",
+ target = "target",
+ sourceDf = stream.toDF().toDF("id", "name", "email", "version"),
+ keys = Seq("id"),
+ sequencing = functions.col("version"),
+ columnSelection = selection)
// Run #1: only (id, name, version) selected; `email` is dropped before
the MERGE.
stream.addData((1, "alice", "ignored", 1L))
@@ -312,17 +284,13 @@ class AutoCdcScd1SchemaEvolutionSuite
val stream = MemoryStream[(Int, String, String, Long)]
def buildCtx(selection: Option[ColumnSelection]):
TestGraphRegistrationContext =
- new TestGraphRegistrationContext(spark) {
- registerTable("target", catalog = Some(catalog), database =
Some(namespace))
- registerFlow(autoCdcFlow(
- name = "auto_cdc_flow",
- target = "target",
- query = dfFlowFunc(stream.toDF().toDF("id", "name", "email",
"version")),
- keys = Seq("id"),
- sequencing = functions.col("version"),
- columnSelection = selection
- ))
- }
+ singleAutoCdcFlowPipeline(
+ flowName = "auto_cdc_flow",
+ target = "target",
+ sourceDf = stream.toDF().toDF("id", "name", "email", "version"),
+ keys = Seq("id"),
+ sequencing = functions.col("version"),
+ columnSelection = selection)
// Run #1: include all columns; populate `email` for key=1.
stream.addData((1, "alice", "[email protected]", 1L))
@@ -365,19 +333,16 @@ class AutoCdcScd1SchemaEvolutionSuite
// Same `MemoryStream[(Int, String, Option[String], Long)]` shape across
runs; runs
// differ in whether `email` is kept in the projected source DF.
val stream = MemoryStream[(Int, String, Option[String], Long)]
- def buildCtx(includeEmail: Boolean): TestGraphRegistrationContext =
- new TestGraphRegistrationContext(spark) {
- registerTable("target", catalog = Some(catalog), database =
Some(namespace))
- val sourceDf = stream.toDF().toDF("id", "name", "email", "version")
- val projectedDf = if (includeEmail) sourceDf else
sourceDf.drop("email")
- registerFlow(autoCdcFlow(
- name = "auto_cdc_flow",
- target = "target",
- query = dfFlowFunc(projectedDf),
- keys = Seq("id"),
- sequencing = functions.col("version")
- ))
- }
+ def buildCtx(includeEmail: Boolean): TestGraphRegistrationContext = {
+ val sourceDf = stream.toDF().toDF("id", "name", "email", "version")
+ val projectedDf = if (includeEmail) sourceDf else sourceDf.drop("email")
+ singleAutoCdcFlowPipeline(
+ flowName = "auto_cdc_flow",
+ target = "target",
+ sourceDf = projectedDf,
+ keys = Seq("id"),
+ sequencing = functions.col("version"))
+ }
// Run #1: wide source DF (id, name, email, version). mergeSchemas appends
`email` to
// the target.
@@ -421,28 +386,25 @@ class AutoCdcScd1SchemaEvolutionSuite
// shapes; the underlying tuple shape is unchanged so the streaming
source's identity
// is stable across runs.
val stream = MemoryStream[(Int, Long, Int, Int, Int)]
- def buildCtx(includeC: Boolean): TestGraphRegistrationContext =
- new TestGraphRegistrationContext(spark) {
- registerTable("target", catalog = Some(catalog), database =
Some(namespace))
- val src = stream.toDF().toDF("key", "version", "a", "b_c", "b_d")
- val inner = if (includeC) {
- functions.struct(functions.col("b_c").as("c"),
functions.col("b_d").as("d"))
- } else {
- functions.struct(functions.col("b_d").as("d"))
- }
- val projected = src.select(
- functions.col("key"),
- functions.col("version"),
- functions.struct(functions.col("a"), inner.as("b")).as("value")
- )
- registerFlow(autoCdcFlow(
- name = "auto_cdc_flow",
- target = "target",
- query = dfFlowFunc(projected),
- keys = Seq("key"),
- sequencing = functions.col("version")
- ))
+ def buildCtx(includeC: Boolean): TestGraphRegistrationContext = {
+ val src = stream.toDF().toDF("key", "version", "a", "b_c", "b_d")
+ val inner = if (includeC) {
+ functions.struct(functions.col("b_c").as("c"),
functions.col("b_d").as("d"))
+ } else {
+ functions.struct(functions.col("b_d").as("d"))
}
+ val projected = src.select(
+ functions.col("key"),
+ functions.col("version"),
+ functions.struct(functions.col("a"), inner.as("b")).as("value")
+ )
+ singleAutoCdcFlowPipeline(
+ flowName = "auto_cdc_flow",
+ target = "target",
+ sourceDf = projected,
+ keys = Seq("key"),
+ sequencing = functions.col("version"))
+ }
stream.addData((1, 1L, 1, 1, 1), (2, 1L, 2, 2, 2))
runPipeline(buildCtx(includeC = true))
@@ -476,30 +438,27 @@ class AutoCdcScd1SchemaEvolutionSuite
)
val stream = MemoryStream[(Int, Long, Int, Int, Int)]
- def buildCtx(includeD: Boolean): TestGraphRegistrationContext =
- new TestGraphRegistrationContext(spark) {
- registerTable("target", catalog = Some(catalog), database =
Some(namespace))
- val src = stream.toDF().toDF("key", "version", "a", "b_c", "b_d")
- val inner = if (includeD) {
- functions.struct(functions.col("b_c").as("c"),
functions.col("b_d").as("d"))
- } else {
- functions.struct(functions.col("b_c").as("c"))
- }
- val projected = src.select(
- functions.col("key"),
- functions.col("version"),
- functions.array(
- functions.struct(functions.col("a"), inner.as("b"))
- ).as("vals")
- )
- registerFlow(autoCdcFlow(
- name = "auto_cdc_flow",
- target = "target",
- query = dfFlowFunc(projected),
- keys = Seq("key"),
- sequencing = functions.col("version")
- ))
+ def buildCtx(includeD: Boolean): TestGraphRegistrationContext = {
+ val src = stream.toDF().toDF("key", "version", "a", "b_c", "b_d")
+ val inner = if (includeD) {
+ functions.struct(functions.col("b_c").as("c"),
functions.col("b_d").as("d"))
+ } else {
+ functions.struct(functions.col("b_c").as("c"))
}
+ val projected = src.select(
+ functions.col("key"),
+ functions.col("version"),
+ functions.array(
+ functions.struct(functions.col("a"), inner.as("b"))
+ ).as("vals")
+ )
+ singleAutoCdcFlowPipeline(
+ flowName = "auto_cdc_flow",
+ target = "target",
+ sourceDf = projected,
+ keys = Seq("key"),
+ sequencing = functions.col("version"))
+ }
stream.addData((1, 1L, 1, 1, 99))
runPipeline(buildCtx(includeD = false))
@@ -537,30 +496,27 @@ class AutoCdcScd1SchemaEvolutionSuite
)
val stream = MemoryStream[(Int, Long, Int, Int, Int)]
- def buildCtx(includeD: Boolean): TestGraphRegistrationContext =
- new TestGraphRegistrationContext(spark) {
- registerTable("target", catalog = Some(catalog), database =
Some(namespace))
- val src = stream.toDF().toDF("key", "version", "a", "b_c", "b_d")
- val inner = if (includeD) {
- functions.struct(functions.col("b_c").as("c"),
functions.col("b_d").as("d"))
- } else {
- functions.struct(functions.col("b_c").as("c"))
- }
- val projected = src.select(
- functions.col("key"),
- functions.col("version"),
- functions.array(
- functions.struct(functions.col("a"), inner.as("b"))
- ).as("vals")
- )
- registerFlow(autoCdcFlow(
- name = "auto_cdc_flow",
- target = "target",
- query = dfFlowFunc(projected),
- keys = Seq("key"),
- sequencing = functions.col("version")
- ))
+ def buildCtx(includeD: Boolean): TestGraphRegistrationContext = {
+ val src = stream.toDF().toDF("key", "version", "a", "b_c", "b_d")
+ val inner = if (includeD) {
+ functions.struct(functions.col("b_c").as("c"),
functions.col("b_d").as("d"))
+ } else {
+ functions.struct(functions.col("b_c").as("c"))
}
+ val projected = src.select(
+ functions.col("key"),
+ functions.col("version"),
+ functions.array(
+ functions.struct(functions.col("a"), inner.as("b"))
+ ).as("vals")
+ )
+ singleAutoCdcFlowPipeline(
+ flowName = "auto_cdc_flow",
+ target = "target",
+ sourceDf = projected,
+ keys = Seq("key"),
+ sequencing = functions.col("version"))
+ }
stream.addData((1, 1L, 1, 1, 1), (2, 1L, 2, 2, 2))
runPipeline(buildCtx(includeD = true))
@@ -598,19 +554,15 @@ class AutoCdcScd1SchemaEvolutionSuite
val stream = MemoryStream[(Int, Long, String)]
stream.addData((1, 1L, "alice"))
- val ctx = new TestGraphRegistrationContext(spark) {
- registerTable("target", catalog = Some(catalog), database =
Some(namespace))
- // Source DF emits `Value` (capital), differing only in case from the
target's
- // `value` column.
- val df = stream.toDF().toDF("key", "version", "Value")
- registerFlow(autoCdcFlow(
- name = "auto_cdc_flow",
- target = "target",
- query = dfFlowFunc(df),
- keys = Seq("key"),
- sequencing = functions.col("version")
- ))
- }
+ // Source DF emits `Value` (capital), differing only in case from the
target's
+ // `value` column.
+ val df = stream.toDF().toDF("key", "version", "Value")
+ val ctx = singleAutoCdcFlowPipeline(
+ flowName = "auto_cdc_flow",
+ target = "target",
+ sourceDf = df,
+ keys = Seq("key"),
+ sequencing = functions.col("version"))
val ex = intercept[RuntimeException] { runPipeline(ctx) }
// The exact `name` and `referenceNames` parameters depend on internal
merge-plan
@@ -655,17 +607,12 @@ class AutoCdcScd1SchemaEvolutionSuite
val stream = MemoryStream[(Int, String, Long)]
stream.addData((1, "alice", 1L), (2, "bob", 1L))
- val ctx = new TestGraphRegistrationContext(spark) {
- registerTable("target", catalog = Some(catalog), database =
Some(namespace))
- registerFlow(autoCdcFlow(
- name = "auto_cdc_flow",
- target = "target",
- query = dfFlowFunc(stream.toDF().toDF("id", "name", "version")),
- keys = Seq("id"),
- sequencing = functions.col("version")
- ))
- }
- runPipeline(ctx)
+ runPipeline(singleAutoCdcFlowPipeline(
+ flowName = "auto_cdc_flow",
+ target = "target",
+ sourceDf = stream.toDF().toDF("id", "name", "version"),
+ keys = Seq("id"),
+ sequencing = functions.col("version")))
checkAnswer(
spark.table(s"$catalog.$namespace.target").select("id", "name",
"version", "extra"),
@@ -691,31 +638,22 @@ class AutoCdcScd1SchemaEvolutionSuite
val stream1 = MemoryStream[(Int, Long, Timestamp)]
stream1.addData((1, 1L, Timestamp.valueOf("2024-01-01 10:00:00")))
- val ctx1 = new TestGraphRegistrationContext(spark) {
- registerTable("target", catalog = Some(catalog), database =
Some(namespace))
- registerFlow(autoCdcFlow(
- name = "auto_cdc_flow",
- target = "target",
- query = dfFlowFunc(stream1.toDF().toDF("key", "version", "value")),
- keys = Seq("key"),
- sequencing = functions.col("version")
- ))
- }
- runPipeline(ctx1)
+ runPipeline(singleAutoCdcFlowPipeline(
+ flowName = "auto_cdc_flow",
+ target = "target",
+ sourceDf = stream1.toDF().toDF("key", "version", "value"),
+ keys = Seq("key"),
+ sequencing = functions.col("version")))
// Run #2 emits `value` as STRING. mergeSchemas rejects the type change.
val stream2 = MemoryStream[(Int, Long, String)]
stream2.addData((1, 2L, "2024-01-02 11:00:00"))
- val ctx2 = new TestGraphRegistrationContext(spark) {
- registerTable("target", catalog = Some(catalog), database =
Some(namespace))
- registerFlow(autoCdcFlow(
- name = "auto_cdc_flow",
- target = "target",
- query = dfFlowFunc(stream2.toDF().toDF("key", "version", "value")),
- keys = Seq("key"),
- sequencing = functions.col("version")
- ))
- }
+ val ctx2 = singleAutoCdcFlowPipeline(
+ flowName = "auto_cdc_flow",
+ target = "target",
+ sourceDf = stream2.toDF().toDF("key", "version", "value"),
+ keys = Seq("key"),
+ sequencing = functions.col("version"))
val ex = intercept[RuntimeException] { runPipeline(ctx2) }
checkErrorInPipelineFailure(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]