This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 075d77455 [cdc] Fix Multibyte Character Encoding in
MultiTableCommittable Serialization
075d77455 is described below
commit 075d774554697cdd2529c0d104682e7437e90fba
Author: Pandas886 <[email protected]>
AuthorDate: Fri Mar 15 23:41:35 2024 +0800
[cdc] Fix Multibyte Character Encoding in MultiTableCommittable
Serialization
This closes #3028
---
.../sink/MultiTableCommittableSerializer.java | 14 ++--
.../sink/MultiTableCommittableSerializerTest.java | 84 ++++++++++++++++++----
2 files changed, 79 insertions(+), 19 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializer.java
index 2cb9f4167..6b1d69c4b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializer.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializer.java
@@ -26,6 +26,7 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
/**
* {@link SimpleVersionedSerializer} for {@link MultiTableCommittable}. If a
type info class is
@@ -53,9 +54,9 @@ public class MultiTableCommittableSerializer
public byte[] serialize(MultiTableCommittable committable) throws
IOException {
// first serialize all metadata
String database = committable.getDatabase();
- int databaseLen = database.length();
+ int databaseLen = database.getBytes(StandardCharsets.UTF_8).length;
String table = committable.getTable();
- int tableLen = table.length();
+ int tableLen = table.getBytes(StandardCharsets.UTF_8).length;
int multiTableMetaLen = databaseLen + tableLen + 2 * 4;
@@ -83,17 +84,18 @@ public class MultiTableCommittableSerializer
int databaseLen = buffer.getInt();
byte[] databaseBytes = new byte[databaseLen];
buffer.get(databaseBytes, 0, databaseLen);
- String database = new String(databaseBytes);
+ String database = new String(databaseBytes, StandardCharsets.UTF_8);
+
int tableLen = buffer.getInt();
byte[] tableBytes = new byte[tableLen];
buffer.get(tableBytes, 0, tableLen);
- String table = new String(tableBytes);
- int multiTableMetaLen = databaseLen + tableLen + 2 * 4;
+ String table = new String(tableBytes, StandardCharsets.UTF_8);
+ int multiTableMetaLen = 4 + databaseLen + 4 + tableLen;
// use committable serializer (of the same version) to deserialize
committable
byte[] serializedCommittable = new byte[bytes.length -
multiTableMetaLen];
- buffer.get(serializedCommittable, 0, bytes.length - multiTableMetaLen);
+ buffer.get(serializedCommittable, 0, serializedCommittable.length);
Committable committable = deserializeCommittable(committableVersion,
serializedCommittable);
return MultiTableCommittable.fromCommittable(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializerTest.java
index 2fa8768fc..904294cf7 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializerTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializerTest.java
@@ -25,14 +25,19 @@ import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.sink.CommitMessageSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
import org.junit.jupiter.api.Test;
import java.io.IOException;
+import java.nio.BufferOverflowException;
+import java.util.function.Consumer;
import static
org.apache.paimon.manifest.ManifestCommittableSerializerTest.randomCompactIncrement;
import static
org.apache.paimon.manifest.ManifestCommittableSerializerTest.randomNewFilesIncrement;
import static
org.apache.paimon.mergetree.compact.MergeTreeCompactManagerTest.row;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertNotNull;
class MultiTableCommittableSerializerTest {
private final CommitMessageSerializer fileSerializer = new
CommitMessageSerializer();
@@ -41,23 +46,76 @@ class MultiTableCommittableSerializerTest {
new MultiTableCommittableSerializer(fileSerializer);
@Test
- public void testFileMetadata() throws IOException {
+ public void testDeserialize() throws IOException {
DataIncrement dataIncrement = randomNewFilesIncrement();
CompactIncrement compactIncrement = randomCompactIncrement();
CommitMessage commitMessage =
new CommitMessageImpl(row(0), 1, dataIncrement,
compactIncrement);
Committable committable = new Committable(9, Committable.Kind.FILE,
commitMessage);
- String database = "database";
- String table = "table";
- MultiTableCommittable multiTableCommittable =
- MultiTableCommittable.fromCommittable(
- Identifier.create(database, table), committable);
- MultiTableCommittable deserializeCommittable =
- serializer.deserialize(2,
serializer.serialize(multiTableCommittable));
-
-
assertThat(deserializeCommittable).isInstanceOf(MultiTableCommittable.class);
-
- assertThat(deserializeCommittable.getDatabase()).isEqualTo(database);
- assertThat(deserializeCommittable.getTable()).isEqualTo(table);
+
+ Lists.newArrayList(Tuple2.of("测试数据库", "用户信息表"), Tuple2.of("database",
"table")).stream()
+ .forEach(
+ new Consumer<Tuple2<String, String>>() {
+ @Override
+ public void accept(Tuple2<String, String>
stringStringTuple2) {
+ String database = stringStringTuple2.f0;
+ String table = stringStringTuple2.f1;
+ MultiTableCommittable multiTableCommittable =
+ MultiTableCommittable.fromCommittable(
+ Identifier.create(database,
table), committable);
+ MultiTableCommittable deserializeCommittable =
null;
+ try {
+ deserializeCommittable =
+ serializer.deserialize(
+ 2,
serializer.serialize(multiTableCommittable));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ assertThat(deserializeCommittable)
+
.isInstanceOf(MultiTableCommittable.class);
+
+
assertThat(deserializeCommittable.getDatabase())
+ .isEqualTo(database);
+
assertThat(deserializeCommittable.getTable()).isEqualTo(table);
+ }
+ });
+ }
+
+ @Test
+ public void testSerialize() throws IOException {
+ DataIncrement newFilesIncrement = randomNewFilesIncrement();
+ CompactIncrement compactIncrement = randomCompactIncrement();
+ CommitMessage commitMessage =
+ new CommitMessageImpl(row(0), 1, newFilesIncrement,
compactIncrement);
+ Committable committable = new Committable(9, Committable.Kind.FILE,
commitMessage);
+
+ Lists.newArrayList(Tuple2.of("测试数据库", "用户信息表"), Tuple2.of("database",
"table")).stream()
+ .forEach(
+ new Consumer<Tuple2<String, String>>() {
+ @Override
+ public void accept(Tuple2<String, String>
stringStringTuple2) {
+ String database = stringStringTuple2.f0;
+ String table = stringStringTuple2.f1;
+
+ MultiTableCommittable multiTableCommittable =
+ MultiTableCommittable.fromCommittable(
+ Identifier.create(database,
table), committable);
+
+ byte[] serializedData = null;
+ try {
+ serializedData =
serializer.serialize(multiTableCommittable);
+ } catch (BufferOverflowException e) {
+ e.printStackTrace();
+ assert false : "Should not throw
BufferOverflowException";
+ } catch (IOException e) {
+ e.printStackTrace();
+ assert false : "IOException occurred";
+ }
+
+ assertNotNull(
+ "The serialized data should not be
null.", serializedData);
+ }
+ });
}
}