This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a790428a1bb [FLINK-37745] Serialize partial deletes in ChangelogMode
to JSON (#26516)
a790428a1bb is described below
commit a790428a1bbed8c09004e6afe1a47bb479796a34
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Wed Apr 30 09:15:17 2025 +0200
[FLINK-37745] Serialize partial deletes in ChangelogMode to JSON (#26516)
---
.../org/apache/flink/table/connector/ChangelogMode.java | 13 ++++++-------
.../nodes/exec/serde/ChangelogModeJsonDeserializer.java | 12 ++++++++++--
.../plan/nodes/exec/serde/ChangelogModeJsonSerializer.java | 6 +++++-
.../plan/nodes/exec/serde/ChangelogModeJsonSerdeTest.java | 9 ++++++---
4 files changed, 27 insertions(+), 13 deletions(-)
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ChangelogMode.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ChangelogMode.java
index 44515802113..82a1e34d0e8 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ChangelogMode.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ChangelogMode.java
@@ -24,7 +24,6 @@ import org.apache.flink.util.Preconditions;
import java.util.Collections;
import java.util.EnumSet;
-import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
@@ -128,19 +127,19 @@ public final class ChangelogMode {
@Override
public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
if (o == null || getClass() != o.getClass()) {
return false;
}
- ChangelogMode that = (ChangelogMode) o;
- return kinds.equals(that.kinds);
+
+ final ChangelogMode that = (ChangelogMode) o;
+ return keyOnlyDeletes == that.keyOnlyDeletes &&
kinds.equals(that.kinds);
}
@Override
public int hashCode() {
- return Objects.hash(kinds);
+ int result = kinds.hashCode();
+ result = 31 * result + Boolean.hashCode(keyOnlyDeletes);
+ return result;
}
@Override
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonDeserializer.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonDeserializer.java
index e6590a738e9..840e99b0083 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonDeserializer.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonDeserializer.java
@@ -28,6 +28,7 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import java.io.IOException;
+import java.util.Objects;
/**
* JSON deserializer for {@link ChangelogMode}.
@@ -37,6 +38,7 @@ import java.io.IOException;
@Internal
final class ChangelogModeJsonDeserializer extends
StdDeserializer<ChangelogMode> {
private static final long serialVersionUID = 1L;
+ private static final String PARTIAL_DELETE = "~" + RowKind.DELETE.name();
ChangelogModeJsonDeserializer() {
super(ChangelogMode.class);
@@ -49,8 +51,14 @@ final class ChangelogModeJsonDeserializer extends
StdDeserializer<ChangelogMode>
ChangelogMode.Builder builder = ChangelogMode.newBuilder();
JsonNode rowKindsNode = jsonParser.readValueAsTree();
for (JsonNode rowKindNode : rowKindsNode) {
- RowKind rowKind =
RowKind.valueOf(rowKindNode.asText().toUpperCase());
- builder.addContainedKind(rowKind);
+ final String rowKindText = rowKindNode.asText();
+ if (Objects.equals(PARTIAL_DELETE, rowKindText)) {
+ builder.keyOnlyDeletes(true);
+ builder.addContainedKind(RowKind.DELETE);
+ } else {
+ RowKind rowKind = RowKind.valueOf(rowKindText);
+ builder.addContainedKind(rowKind);
+ }
}
return builder.build();
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonSerializer.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonSerializer.java
index 8d0c4621389..8483eb82d53 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonSerializer.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonSerializer.java
@@ -49,7 +49,11 @@ final class ChangelogModeJsonSerializer extends
StdSerializer<ChangelogMode> {
throws IOException {
jsonGenerator.writeStartArray();
for (RowKind rowKind : changelogMode.getContainedKinds()) {
- jsonGenerator.writeString(rowKind.name());
+ if (rowKind == RowKind.DELETE && changelogMode.keyOnlyDeletes()) {
+ jsonGenerator.writeString("~" + rowKind.name());
+ } else {
+ jsonGenerator.writeString(rowKind.name());
+ }
}
jsonGenerator.writeEndArray();
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonSerdeTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonSerdeTest.java
index 2c94a5b514c..75f542592ce 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonSerdeTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonSerdeTest.java
@@ -21,9 +21,10 @@ package org.apache.flink.table.planner.plan.nodes.exec.serde;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.types.RowKind;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
@@ -33,14 +34,16 @@ import static
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTest
@Execution(ExecutionMode.CONCURRENT)
class ChangelogModeJsonSerdeTest {
- @Test
- void testChangelogModeSerde() throws IOException {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testChangelogModeSerde(boolean keyOnlyDeletes) throws IOException {
ChangelogMode changelogMode =
ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.DELETE)
.addContainedKind(RowKind.UPDATE_AFTER)
.addContainedKind(RowKind.UPDATE_BEFORE)
+ .keyOnlyDeletes(keyOnlyDeletes)
.build();
testJsonRoundTrip(changelogMode, ChangelogMode.class);