This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git

commit 03a2ae3ca78231660b81ef746e4577c2e7699531
Author: Leonard Xu <[email protected]>
AuthorDate: Thu Jul 25 09:54:45 2024 +0800

    [FLINK-35893][cdc-runtime] Write CURRENT_VERSION of TableChangeInfo to state
    
    This closes #2944.
---
 .../cdc/runtime/operators/transform/TableChangeInfo.java   | 14 ++++++++++++++
 1 file changed, 14 insertions(+)

diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TableChangeInfo.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TableChangeInfo.java
index fa04fa50a..ca9781eb1 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TableChangeInfo.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TableChangeInfo.java
@@ -109,8 +109,14 @@ public class TableChangeInfo {
     /** Serializer for {@link TableChangeInfo}. */
     public static class Serializer implements 
SimpleVersionedSerializer<TableChangeInfo> {
 
+        /** The latest version before change of state compatibility. */
+        public static final int VERSION_BEFORE_STATE_COMPATIBILITY = 1;
+
         public static final int CURRENT_VERSION = 2;
 
+        /** Used to distinguish with the state which CURRENT_VERSION was not 
written. */
+        public static final TableId MAGIC_TABLE_ID = 
TableId.tableId("__magic_table__");
+
         @Override
         public int getVersion() {
             return CURRENT_VERSION;
@@ -122,6 +128,8 @@ public class TableChangeInfo {
             SchemaSerializer schemaSerializer = SchemaSerializer.INSTANCE;
             try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
                     DataOutputStream out = new DataOutputStream(baos)) {
+                tableIdSerializer.serialize(MAGIC_TABLE_ID, new 
DataOutputViewStreamWrapper(out));
+                out.writeInt(CURRENT_VERSION);
                 tableIdSerializer.serialize(
                         tableChangeInfo.getTableId(), new 
DataOutputViewStreamWrapper(out));
                 schemaSerializer.serialize(
@@ -139,6 +147,12 @@ public class TableChangeInfo {
             try (ByteArrayInputStream bais = new 
ByteArrayInputStream(serialized);
                     DataInputStream in = new DataInputStream(bais)) {
                 TableId tableId = tableIdSerializer.deserialize(new 
DataInputViewStreamWrapper(in));
+                if (tableId.equals(MAGIC_TABLE_ID)) {
+                    version = in.readInt();
+                    tableId = tableIdSerializer.deserialize(new 
DataInputViewStreamWrapper(in));
+                } else {
+                    version = VERSION_BEFORE_STATE_COMPATIBILITY;
+                }
                 Schema originalSchema =
                         schemaSerializer.deserialize(version, new 
DataInputViewStreamWrapper(in));
                 Schema transformedSchema =

Reply via email to