This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a790428a1bb [FLINK-37745] Serialize partial deletes in ChangelogMode 
to JSON (#26516)
a790428a1bb is described below

commit a790428a1bbed8c09004e6afe1a47bb479796a34
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Wed Apr 30 09:15:17 2025 +0200

    [FLINK-37745] Serialize partial deletes in ChangelogMode to JSON (#26516)
---
 .../org/apache/flink/table/connector/ChangelogMode.java     | 13 ++++++-------
 .../nodes/exec/serde/ChangelogModeJsonDeserializer.java     | 12 ++++++++++--
 .../plan/nodes/exec/serde/ChangelogModeJsonSerializer.java  |  6 +++++-
 .../plan/nodes/exec/serde/ChangelogModeJsonSerdeTest.java   |  9 ++++++---
 4 files changed, 27 insertions(+), 13 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ChangelogMode.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ChangelogMode.java
index 44515802113..82a1e34d0e8 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ChangelogMode.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ChangelogMode.java
@@ -24,7 +24,6 @@ import org.apache.flink.util.Preconditions;
 
 import java.util.Collections;
 import java.util.EnumSet;
-import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -128,19 +127,19 @@ public final class ChangelogMode {
 
     @Override
     public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
         if (o == null || getClass() != o.getClass()) {
             return false;
         }
-        ChangelogMode that = (ChangelogMode) o;
-        return kinds.equals(that.kinds);
+
+        final ChangelogMode that = (ChangelogMode) o;
+        return keyOnlyDeletes == that.keyOnlyDeletes && 
kinds.equals(that.kinds);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(kinds);
+        int result = kinds.hashCode();
+        result = 31 * result + Boolean.hashCode(keyOnlyDeletes);
+        return result;
     }
 
     @Override
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonDeserializer.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonDeserializer.java
index e6590a738e9..840e99b0083 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonDeserializer.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonDeserializer.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
 
 import java.io.IOException;
+import java.util.Objects;
 
 /**
  * JSON deserializer for {@link ChangelogMode}.
@@ -37,6 +38,7 @@ import java.io.IOException;
 @Internal
 final class ChangelogModeJsonDeserializer extends 
StdDeserializer<ChangelogMode> {
     private static final long serialVersionUID = 1L;
+    private static final String PARTIAL_DELETE = "~" + RowKind.DELETE.name();
 
     ChangelogModeJsonDeserializer() {
         super(ChangelogMode.class);
@@ -49,8 +51,14 @@ final class ChangelogModeJsonDeserializer extends 
StdDeserializer<ChangelogMode>
         ChangelogMode.Builder builder = ChangelogMode.newBuilder();
         JsonNode rowKindsNode = jsonParser.readValueAsTree();
         for (JsonNode rowKindNode : rowKindsNode) {
-            RowKind rowKind = 
RowKind.valueOf(rowKindNode.asText().toUpperCase());
-            builder.addContainedKind(rowKind);
+            final String rowKindText = rowKindNode.asText();
+            if (Objects.equals(PARTIAL_DELETE, rowKindText)) {
+                builder.keyOnlyDeletes(true);
+                builder.addContainedKind(RowKind.DELETE);
+            } else {
+                RowKind rowKind = RowKind.valueOf(rowKindText);
+                builder.addContainedKind(rowKind);
+            }
         }
         return builder.build();
     }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonSerializer.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonSerializer.java
index 8d0c4621389..8483eb82d53 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonSerializer.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonSerializer.java
@@ -49,7 +49,11 @@ final class ChangelogModeJsonSerializer extends 
StdSerializer<ChangelogMode> {
             throws IOException {
         jsonGenerator.writeStartArray();
         for (RowKind rowKind : changelogMode.getContainedKinds()) {
-            jsonGenerator.writeString(rowKind.name());
+            if (rowKind == RowKind.DELETE && changelogMode.keyOnlyDeletes()) {
+                jsonGenerator.writeString("~" + rowKind.name());
+            } else {
+                jsonGenerator.writeString(rowKind.name());
+            }
         }
         jsonGenerator.writeEndArray();
     }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonSerdeTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonSerdeTest.java
index 2c94a5b514c..75f542592ce 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonSerdeTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonSerdeTest.java
@@ -21,9 +21,10 @@ package org.apache.flink.table.planner.plan.nodes.exec.serde;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.types.RowKind;
 
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.parallel.Execution;
 import org.junit.jupiter.api.parallel.ExecutionMode;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 
@@ -33,14 +34,16 @@ import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTest
 @Execution(ExecutionMode.CONCURRENT)
 class ChangelogModeJsonSerdeTest {
 
-    @Test
-    void testChangelogModeSerde() throws IOException {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testChangelogModeSerde(boolean keyOnlyDeletes) throws IOException {
         ChangelogMode changelogMode =
                 ChangelogMode.newBuilder()
                         .addContainedKind(RowKind.INSERT)
                         .addContainedKind(RowKind.DELETE)
                         .addContainedKind(RowKind.UPDATE_AFTER)
                         .addContainedKind(RowKind.UPDATE_BEFORE)
+                        .keyOnlyDeletes(keyOnlyDeletes)
                         .build();
 
         testJsonRoundTrip(changelogMode, ChangelogMode.class);

Reply via email to