This is an automated email from the ASF dual-hosted git repository.

gengliangwang pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.2 by this push:
     new e7c40b766e8c [SPARK-56991][SQL] Add helper method for deriving 
Changelog DeduplicationMode
e7c40b766e8c is described below

commit e7c40b766e8c1a3cbea013b52a3830505ae68c6f
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Wed May 27 14:13:47 2026 -0700

    [SPARK-56991][SQL] Add helper method for deriving Changelog 
DeduplicationMode
    
    ### What changes were proposed in this pull request?
    
    This PR adds a helper method for deriving DeduplicationMode in changelogs.
    
    ### Why are the changes needed?
    
    These changes are needed to simplify the code.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing coverage in ChangelogContextUtilsSuite, PlanParserSuite, and 
StreamRelationParserSuite already exercises all three modes through both call 
sites (including the case-insensitive path and the invalid-value error path), 
so the refactor is well-covered
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Claude code v2.1.149.
    
    Closes #56151 from aokolnychyi/spark-56991-v2.
    
    Lead-authored-by: Anton Okolnychyi <[email protected]>
    Co-authored-by: Gengliang Wang <[email protected]>
    Signed-off-by: Gengliang Wang <[email protected]>
    (cherry picked from commit 064392c70852bf0b2f184c058f3d54efa1f0c9fe)
    Signed-off-by: Gengliang Wang <[email protected]>
---
 .../sql/connector/catalog/ChangelogContext.java    | 16 ++++++++++---
 .../catalyst/analysis/ChangelogContextUtils.scala  | 28 ++++++++++++++--------
 .../spark/sql/catalyst/parser/AstBuilder.scala     | 10 +-------
 3 files changed, 32 insertions(+), 22 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ChangelogContext.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ChangelogContext.java
index d6ea072b9634..e5e4cd380fe0 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ChangelogContext.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ChangelogContext.java
@@ -38,11 +38,21 @@ public class ChangelogContext {
    */
   public enum DeduplicationMode {
     /** Raw change rows as-is from the connector — no post-processing. */
-    NONE,
+    NONE("none"),
     /** Remove identical insert/delete pairs from copy-on-write file rewrites 
(default). */
-    DROP_CARRYOVERS,
+    DROP_CARRYOVERS("dropCarryovers"),
     /** Collapse to one net change per row identity across the entire 
changelog range. */
-    NET_CHANGES
+    NET_CHANGES("netChanges");
+
+    private final String value;
+
+    DeduplicationMode(String value) {
+      this.value = value;
+    }
+
+    public String value() {
+      return value;
+    }
   }
 
   private final ChangelogRange range;
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ChangelogContextUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ChangelogContextUtils.scala
index 2b679d955f52..bdef991edcec 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ChangelogContextUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ChangelogContextUtils.scala
@@ -18,10 +18,12 @@
 package org.apache.spark.sql.catalyst.analysis
 
 import java.lang.{Long => JLong}
-import java.util.{Locale, Optional => JOptional}
+import java.util.{Optional => JOptional}
 
 import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
 import org.apache.spark.sql.connector.catalog.ChangelogContext
+import 
org.apache.spark.sql.connector.catalog.ChangelogContext.DeduplicationMode
+import 
org.apache.spark.sql.connector.catalog.ChangelogContext.DeduplicationMode.DROP_CARRYOVERS
 import org.apache.spark.sql.connector.catalog.ChangelogRange.{TimestampRange, 
UnboundedRange, VersionRange}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.types.TimestampType
@@ -56,15 +58,7 @@ object ChangelogContextUtils {
     val startInclusive = options.getBoolean(STARTING_BOUND_INCLUSIVE, true)
     val endInclusive = options.getBoolean(ENDING_BOUND_INCLUSIVE, true)
 
-    val deduplicationModeStr = Option(options.get(DEDUPLICATION_MODE))
-      .getOrElse("dropCarryovers").toLowerCase(Locale.ROOT)
-    val deduplicationMode = deduplicationModeStr match {
-      case "none" => ChangelogContext.DeduplicationMode.NONE
-      case "dropcarryovers" => 
ChangelogContext.DeduplicationMode.DROP_CARRYOVERS
-      case "netchanges" => ChangelogContext.DeduplicationMode.NET_CHANGES
-      case other =>
-        throw 
QueryCompilationErrors.invalidCdcOptionInvalidDeduplicationMode(other)
-    }
+    val deduplicationMode = parseDeduplicationMode(options)
     val computeUpdates = options.getBoolean(COMPUTE_UPDATES, false)
 
     // Determine range from options
@@ -101,6 +95,20 @@ object ChangelogContextUtils {
     new ChangelogContext(range, deduplicationMode, computeUpdates)
   }
 
+  def parseDeduplicationMode(options: CaseInsensitiveStringMap): 
DeduplicationMode = {
+    if (options.containsKey(DEDUPLICATION_MODE)) {
+      parseDeduplicationMode(options.get(DEDUPLICATION_MODE))
+    } else {
+      DROP_CARRYOVERS
+    }
+  }
+
+  private def parseDeduplicationMode(value: String): DeduplicationMode = {
+    DeduplicationMode.values()
+      .find(_.value.equalsIgnoreCase(value))
+      .getOrElse(throw 
QueryCompilationErrors.invalidCdcOptionInvalidDeduplicationMode(value))
+  }
+
   private def parseTimestamp(timestampStr: String, sessionLocalTimeZone: 
String): Long = {
     val value = Cast(
       Literal(timestampStr),
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index c5eb458e2580..e6de6a1c9bc6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -2739,15 +2739,7 @@ class AstBuilder extends DataTypeAstBuilder
    */
   private def resolveChangelogOptions(
       options: CaseInsensitiveStringMap): (ChangelogContext.DeduplicationMode, 
Boolean) = {
-    val deduplicationModeStr = Option(options.get("deduplicationMode"))
-      .getOrElse("dropCarryovers").toLowerCase(Locale.ROOT)
-    val deduplicationMode = deduplicationModeStr match {
-      case "none" => ChangelogContext.DeduplicationMode.NONE
-      case "dropcarryovers" => 
ChangelogContext.DeduplicationMode.DROP_CARRYOVERS
-      case "netchanges" => ChangelogContext.DeduplicationMode.NET_CHANGES
-      case other =>
-        throw 
QueryCompilationErrors.invalidCdcOptionInvalidDeduplicationMode(other)
-    }
+    val deduplicationMode = 
ChangelogContextUtils.parseDeduplicationMode(options)
     val computeUpdates = options.getBoolean("computeUpdates", false)
     (deduplicationMode, computeUpdates)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to