This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 075d77455 [cdc] Fix Multibyte Character Encoding in 
MultiTableCommittable Serialization
075d77455 is described below

commit 075d774554697cdd2529c0d104682e7437e90fba
Author: Pandas886 <[email protected]>
AuthorDate: Fri Mar 15 23:41:35 2024 +0800

    [cdc] Fix Multibyte Character Encoding in MultiTableCommittable 
Serialization
    
    This closes #3028
---
 .../sink/MultiTableCommittableSerializer.java      | 14 ++--
 .../sink/MultiTableCommittableSerializerTest.java  | 84 ++++++++++++++++++----
 2 files changed, 79 insertions(+), 19 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializer.java
index 2cb9f4167..6b1d69c4b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializer.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializer.java
@@ -26,6 +26,7 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 
 /**
  * {@link SimpleVersionedSerializer} for {@link MultiTableCommittable}. If a 
type info class is
@@ -53,9 +54,9 @@ public class MultiTableCommittableSerializer
     public byte[] serialize(MultiTableCommittable committable) throws 
IOException {
         // first serialize all metadata
         String database = committable.getDatabase();
-        int databaseLen = database.length();
+        int databaseLen = database.getBytes(StandardCharsets.UTF_8).length;
         String table = committable.getTable();
-        int tableLen = table.length();
+        int tableLen = table.getBytes(StandardCharsets.UTF_8).length;
 
         int multiTableMetaLen = databaseLen + tableLen + 2 * 4;
 
@@ -83,17 +84,18 @@ public class MultiTableCommittableSerializer
         int databaseLen = buffer.getInt();
         byte[] databaseBytes = new byte[databaseLen];
         buffer.get(databaseBytes, 0, databaseLen);
-        String database = new String(databaseBytes);
+        String database = new String(databaseBytes, StandardCharsets.UTF_8);
+
         int tableLen = buffer.getInt();
         byte[] tableBytes = new byte[tableLen];
         buffer.get(tableBytes, 0, tableLen);
-        String table = new String(tableBytes);
-        int multiTableMetaLen = databaseLen + tableLen + 2 * 4;
+        String table = new String(tableBytes, StandardCharsets.UTF_8);
+        int multiTableMetaLen = 4 + databaseLen + 4 + tableLen;
 
         // use committable serializer (of the same version) to deserialize 
committable
         byte[] serializedCommittable = new byte[bytes.length - 
multiTableMetaLen];
 
-        buffer.get(serializedCommittable, 0, bytes.length - multiTableMetaLen);
+        buffer.get(serializedCommittable, 0, serializedCommittable.length);
         Committable committable = deserializeCommittable(committableVersion, 
serializedCommittable);
 
         return MultiTableCommittable.fromCommittable(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializerTest.java
index 2fa8768fc..904294cf7 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializerTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializerTest.java
@@ -25,14 +25,19 @@ import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.table.sink.CommitMessageSerializer;
 
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
+import java.nio.BufferOverflowException;
+import java.util.function.Consumer;
 
 import static 
org.apache.paimon.manifest.ManifestCommittableSerializerTest.randomCompactIncrement;
 import static 
org.apache.paimon.manifest.ManifestCommittableSerializerTest.randomNewFilesIncrement;
 import static 
org.apache.paimon.mergetree.compact.MergeTreeCompactManagerTest.row;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertNotNull;
 
 class MultiTableCommittableSerializerTest {
     private final CommitMessageSerializer fileSerializer = new 
CommitMessageSerializer();
@@ -41,23 +46,76 @@ class MultiTableCommittableSerializerTest {
             new MultiTableCommittableSerializer(fileSerializer);
 
     @Test
-    public void testFileMetadata() throws IOException {
+    public void testDeserialize() throws IOException {
         DataIncrement dataIncrement = randomNewFilesIncrement();
         CompactIncrement compactIncrement = randomCompactIncrement();
         CommitMessage commitMessage =
                 new CommitMessageImpl(row(0), 1, dataIncrement, 
compactIncrement);
         Committable committable = new Committable(9, Committable.Kind.FILE, 
commitMessage);
-        String database = "database";
-        String table = "table";
-        MultiTableCommittable multiTableCommittable =
-                MultiTableCommittable.fromCommittable(
-                        Identifier.create(database, table), committable);
-        MultiTableCommittable deserializeCommittable =
-                serializer.deserialize(2, 
serializer.serialize(multiTableCommittable));
-
-        
assertThat(deserializeCommittable).isInstanceOf(MultiTableCommittable.class);
-
-        assertThat(deserializeCommittable.getDatabase()).isEqualTo(database);
-        assertThat(deserializeCommittable.getTable()).isEqualTo(table);
+
+        Lists.newArrayList(Tuple2.of("测试数据库", "用户信息表"), Tuple2.of("database", 
"table")).stream()
+                .forEach(
+                        new Consumer<Tuple2<String, String>>() {
+                            @Override
+                            public void accept(Tuple2<String, String> 
stringStringTuple2) {
+                                String database = stringStringTuple2.f0;
+                                String table = stringStringTuple2.f1;
+                                MultiTableCommittable multiTableCommittable =
+                                        MultiTableCommittable.fromCommittable(
+                                                Identifier.create(database, 
table), committable);
+                                MultiTableCommittable deserializeCommittable = 
null;
+                                try {
+                                    deserializeCommittable =
+                                            serializer.deserialize(
+                                                    2, 
serializer.serialize(multiTableCommittable));
+                                } catch (IOException e) {
+                                    throw new RuntimeException(e);
+                                }
+
+                                assertThat(deserializeCommittable)
+                                        
.isInstanceOf(MultiTableCommittable.class);
+
+                                
assertThat(deserializeCommittable.getDatabase())
+                                        .isEqualTo(database);
+                                
assertThat(deserializeCommittable.getTable()).isEqualTo(table);
+                            }
+                        });
+    }
+
+    @Test
+    public void testSerialize() throws IOException {
+        DataIncrement newFilesIncrement = randomNewFilesIncrement();
+        CompactIncrement compactIncrement = randomCompactIncrement();
+        CommitMessage commitMessage =
+                new CommitMessageImpl(row(0), 1, newFilesIncrement, 
compactIncrement);
+        Committable committable = new Committable(9, Committable.Kind.FILE, 
commitMessage);
+
+        Lists.newArrayList(Tuple2.of("测试数据库", "用户信息表"), Tuple2.of("database", 
"table")).stream()
+                .forEach(
+                        new Consumer<Tuple2<String, String>>() {
+                            @Override
+                            public void accept(Tuple2<String, String> 
stringStringTuple2) {
+                                String database = stringStringTuple2.f0;
+                                String table = stringStringTuple2.f1;
+
+                                MultiTableCommittable multiTableCommittable =
+                                        MultiTableCommittable.fromCommittable(
+                                                Identifier.create(database, 
table), committable);
+
+                                byte[] serializedData = null;
+                                try {
+                                    serializedData = 
serializer.serialize(multiTableCommittable);
+                                } catch (BufferOverflowException e) {
+                                    e.printStackTrace();
+                                    assert false : "Should not throw 
BufferOverflowException";
+                                } catch (IOException e) {
+                                    e.printStackTrace();
+                                    assert false : "IOException occurred";
+                                }
+
+                                assertNotNull(
+                                        "The serialized data should not be 
null.", serializedData);
+                            }
+                        });
     }
 }

Reply via email to