This is an automated email from the ASF dual-hosted git repository.
gengliangwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 064392c70852 [SPARK-56991][SQL] Add helper method for deriving
Changelog DeduplicationMode
064392c70852 is described below
commit 064392c70852bf0b2f184c058f3d54efa1f0c9fe
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Wed May 27 14:13:47 2026 -0700
[SPARK-56991][SQL] Add helper method for deriving Changelog
DeduplicationMode
### What changes were proposed in this pull request?
This PR adds a helper method for deriving DeduplicationMode in changelogs.
### Why are the changes needed?
These changes are needed to simplify the code.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing coverage in ChangelogContextUtilsSuite, PlanParserSuite, and
StreamRelationParserSuite already exercises all three modes through both call
sites (including the case-insensitive path and the invalid-value error path),
so the refactor is well-covered
### Was this patch authored or co-authored using generative AI tooling?
Claude code v2.1.149.
Closes #56151 from aokolnychyi/spark-56991-v2.
Lead-authored-by: Anton Okolnychyi <[email protected]>
Co-authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../sql/connector/catalog/ChangelogContext.java | 16 ++++++++++---
.../catalyst/analysis/ChangelogContextUtils.scala | 28 ++++++++++++++--------
.../spark/sql/catalyst/parser/AstBuilder.scala | 10 +-------
3 files changed, 32 insertions(+), 22 deletions(-)
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ChangelogContext.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ChangelogContext.java
index d6ea072b9634..e5e4cd380fe0 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ChangelogContext.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ChangelogContext.java
@@ -38,11 +38,21 @@ public class ChangelogContext {
*/
public enum DeduplicationMode {
/** Raw change rows as-is from the connector — no post-processing. */
- NONE,
+ NONE("none"),
/** Remove identical insert/delete pairs from copy-on-write file rewrites
(default). */
- DROP_CARRYOVERS,
+ DROP_CARRYOVERS("dropCarryovers"),
/** Collapse to one net change per row identity across the entire
changelog range. */
- NET_CHANGES
+ NET_CHANGES("netChanges");
+
+ private final String value;
+
+ DeduplicationMode(String value) {
+ this.value = value;
+ }
+
+ public String value() {
+ return value;
+ }
}
private final ChangelogRange range;
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ChangelogContextUtils.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ChangelogContextUtils.scala
index 2b679d955f52..bdef991edcec 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ChangelogContextUtils.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ChangelogContextUtils.scala
@@ -18,10 +18,12 @@
package org.apache.spark.sql.catalyst.analysis
import java.lang.{Long => JLong}
-import java.util.{Locale, Optional => JOptional}
+import java.util.{Optional => JOptional}
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
import org.apache.spark.sql.connector.catalog.ChangelogContext
+import
org.apache.spark.sql.connector.catalog.ChangelogContext.DeduplicationMode
+import
org.apache.spark.sql.connector.catalog.ChangelogContext.DeduplicationMode.DROP_CARRYOVERS
import org.apache.spark.sql.connector.catalog.ChangelogRange.{TimestampRange,
UnboundedRange, VersionRange}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.TimestampType
@@ -56,15 +58,7 @@ object ChangelogContextUtils {
val startInclusive = options.getBoolean(STARTING_BOUND_INCLUSIVE, true)
val endInclusive = options.getBoolean(ENDING_BOUND_INCLUSIVE, true)
- val deduplicationModeStr = Option(options.get(DEDUPLICATION_MODE))
- .getOrElse("dropCarryovers").toLowerCase(Locale.ROOT)
- val deduplicationMode = deduplicationModeStr match {
- case "none" => ChangelogContext.DeduplicationMode.NONE
- case "dropcarryovers" =>
ChangelogContext.DeduplicationMode.DROP_CARRYOVERS
- case "netchanges" => ChangelogContext.DeduplicationMode.NET_CHANGES
- case other =>
- throw
QueryCompilationErrors.invalidCdcOptionInvalidDeduplicationMode(other)
- }
+ val deduplicationMode = parseDeduplicationMode(options)
val computeUpdates = options.getBoolean(COMPUTE_UPDATES, false)
// Determine range from options
@@ -101,6 +95,20 @@ object ChangelogContextUtils {
new ChangelogContext(range, deduplicationMode, computeUpdates)
}
+ def parseDeduplicationMode(options: CaseInsensitiveStringMap):
DeduplicationMode = {
+ if (options.containsKey(DEDUPLICATION_MODE)) {
+ parseDeduplicationMode(options.get(DEDUPLICATION_MODE))
+ } else {
+ DROP_CARRYOVERS
+ }
+ }
+
+ private def parseDeduplicationMode(value: String): DeduplicationMode = {
+ DeduplicationMode.values()
+ .find(_.value.equalsIgnoreCase(value))
+ .getOrElse(throw
QueryCompilationErrors.invalidCdcOptionInvalidDeduplicationMode(value))
+ }
+
private def parseTimestamp(timestampStr: String, sessionLocalTimeZone:
String): Long = {
val value = Cast(
Literal(timestampStr),
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index a2c3cbfec758..4e97f94c7581 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -2753,15 +2753,7 @@ class AstBuilder extends DataTypeAstBuilder
*/
private def resolveChangelogOptions(
options: CaseInsensitiveStringMap): (ChangelogContext.DeduplicationMode,
Boolean) = {
- val deduplicationModeStr = Option(options.get("deduplicationMode"))
- .getOrElse("dropCarryovers").toLowerCase(Locale.ROOT)
- val deduplicationMode = deduplicationModeStr match {
- case "none" => ChangelogContext.DeduplicationMode.NONE
- case "dropcarryovers" =>
ChangelogContext.DeduplicationMode.DROP_CARRYOVERS
- case "netchanges" => ChangelogContext.DeduplicationMode.NET_CHANGES
- case other =>
- throw
QueryCompilationErrors.invalidCdcOptionInvalidDeduplicationMode(other)
- }
+ val deduplicationMode =
ChangelogContextUtils.parseDeduplicationMode(options)
val computeUpdates = options.getBoolean("computeUpdates", false)
(deduplicationMode, computeUpdates)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]