This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ee67f38a [cdc] Optimize MultiTableCommittableSerializer
2ee67f38a is described below

commit 2ee67f38a5ce4350c94dcfbd64614ebab132f15d
Author: Jingsong <[email protected]>
AuthorDate: Sat May 4 16:11:01 2024 +0800

    [cdc] Optimize MultiTableCommittableSerializer
---
 .../sink/MultiTableCommittableSerializer.java      | 16 ++--
 .../sink/MultiTableCommittableSerializerTest.java  | 97 ++++++++++------------
 2 files changed, 51 insertions(+), 62 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializer.java
index 6b1d69c4b..2b427db4a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializer.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializer.java
@@ -53,21 +53,19 @@ public class MultiTableCommittableSerializer
     @Override
     public byte[] serialize(MultiTableCommittable committable) throws 
IOException {
         // first serialize all metadata
-        String database = committable.getDatabase();
-        int databaseLen = database.getBytes(StandardCharsets.UTF_8).length;
-        String table = committable.getTable();
-        int tableLen = table.getBytes(StandardCharsets.UTF_8).length;
+        byte[] database = 
committable.getDatabase().getBytes(StandardCharsets.UTF_8);
+        byte[] table = committable.getTable().getBytes(StandardCharsets.UTF_8);
 
-        int multiTableMetaLen = databaseLen + tableLen + 2 * 4;
+        int multiTableMetaLen = database.length + table.length + 2 * 4;
 
         // use committable serializer (of the same version) to serialize 
committable
         byte[] serializedCommittable = serializeCommittable(committable);
 
         return ByteBuffer.allocate(multiTableMetaLen + 
serializedCommittable.length)
-                .putInt(databaseLen)
-                .put(database.getBytes())
-                .putInt(tableLen)
-                .put(table.getBytes())
+                .putInt(database.length)
+                .put(database)
+                .putInt(table.length)
+                .put(table)
                 .put(serializedCommittable)
                 .array();
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializerTest.java
index 904294cf7..16c1a7d04 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializerTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializerTest.java
@@ -26,96 +26,87 @@ import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.table.sink.CommitMessageSerializer;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.nio.BufferOverflowException;
-import java.util.function.Consumer;
+import java.util.Arrays;
 
 import static 
org.apache.paimon.manifest.ManifestCommittableSerializerTest.randomCompactIncrement;
 import static 
org.apache.paimon.manifest.ManifestCommittableSerializerTest.randomNewFilesIncrement;
 import static 
org.apache.paimon.mergetree.compact.MergeTreeCompactManagerTest.row;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.Assert.assertNotNull;
 
 class MultiTableCommittableSerializerTest {
+
     private final CommitMessageSerializer fileSerializer = new 
CommitMessageSerializer();
 
     private final MultiTableCommittableSerializer serializer =
             new MultiTableCommittableSerializer(fileSerializer);
 
     @Test
-    public void testDeserialize() throws IOException {
+    public void testDeserialize() {
         DataIncrement dataIncrement = randomNewFilesIncrement();
         CompactIncrement compactIncrement = randomCompactIncrement();
         CommitMessage commitMessage =
                 new CommitMessageImpl(row(0), 1, dataIncrement, 
compactIncrement);
         Committable committable = new Committable(9, Committable.Kind.FILE, 
commitMessage);
 
-        Lists.newArrayList(Tuple2.of("测试数据库", "用户信息表"), Tuple2.of("database", 
"table")).stream()
+        Arrays.asList(Tuple2.of("测试数据库", "用户信息表"), Tuple2.of("database", 
"table"))
                 .forEach(
-                        new Consumer<Tuple2<String, String>>() {
-                            @Override
-                            public void accept(Tuple2<String, String> 
stringStringTuple2) {
-                                String database = stringStringTuple2.f0;
-                                String table = stringStringTuple2.f1;
-                                MultiTableCommittable multiTableCommittable =
-                                        MultiTableCommittable.fromCommittable(
-                                                Identifier.create(database, 
table), committable);
-                                MultiTableCommittable deserializeCommittable = 
null;
-                                try {
-                                    deserializeCommittable =
-                                            serializer.deserialize(
-                                                    2, 
serializer.serialize(multiTableCommittable));
-                                } catch (IOException e) {
-                                    throw new RuntimeException(e);
-                                }
-
-                                assertThat(deserializeCommittable)
-                                        
.isInstanceOf(MultiTableCommittable.class);
-
-                                
assertThat(deserializeCommittable.getDatabase())
-                                        .isEqualTo(database);
-                                
assertThat(deserializeCommittable.getTable()).isEqualTo(table);
+                        tuple2 -> {
+                            String database = tuple2.f0;
+                            String table = tuple2.f1;
+                            MultiTableCommittable multiTableCommittable =
+                                    MultiTableCommittable.fromCommittable(
+                                            Identifier.create(database, 
table), committable);
+                            MultiTableCommittable deserializeCommittable;
+                            try {
+                                deserializeCommittable =
+                                        serializer.deserialize(
+                                                2, 
serializer.serialize(multiTableCommittable));
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
                             }
+
+                            assertThat(deserializeCommittable)
+                                    .isInstanceOf(MultiTableCommittable.class);
+
+                            
assertThat(deserializeCommittable.getDatabase()).isEqualTo(database);
+                            
assertThat(deserializeCommittable.getTable()).isEqualTo(table);
                         });
     }
 
     @Test
-    public void testSerialize() throws IOException {
+    public void testSerialize() {
         DataIncrement newFilesIncrement = randomNewFilesIncrement();
         CompactIncrement compactIncrement = randomCompactIncrement();
         CommitMessage commitMessage =
                 new CommitMessageImpl(row(0), 1, newFilesIncrement, 
compactIncrement);
         Committable committable = new Committable(9, Committable.Kind.FILE, 
commitMessage);
 
-        Lists.newArrayList(Tuple2.of("测试数据库", "用户信息表"), Tuple2.of("database", 
"table")).stream()
+        Arrays.asList(Tuple2.of("测试数据库", "用户信息表"), Tuple2.of("database", 
"table"))
                 .forEach(
-                        new Consumer<Tuple2<String, String>>() {
-                            @Override
-                            public void accept(Tuple2<String, String> 
stringStringTuple2) {
-                                String database = stringStringTuple2.f0;
-                                String table = stringStringTuple2.f1;
-
-                                MultiTableCommittable multiTableCommittable =
-                                        MultiTableCommittable.fromCommittable(
-                                                Identifier.create(database, 
table), committable);
-
-                                byte[] serializedData = null;
-                                try {
-                                    serializedData = 
serializer.serialize(multiTableCommittable);
-                                } catch (BufferOverflowException e) {
-                                    e.printStackTrace();
-                                    assert false : "Should not throw 
BufferOverflowException";
-                                } catch (IOException e) {
-                                    e.printStackTrace();
-                                    assert false : "IOException occurred";
-                                }
-
-                                assertNotNull(
-                                        "The serialized data should not be 
null.", serializedData);
+                        tuple2 -> {
+                            String database = tuple2.f0;
+                            String table = tuple2.f1;
+
+                            MultiTableCommittable multiTableCommittable =
+                                    MultiTableCommittable.fromCommittable(
+                                            Identifier.create(database, 
table), committable);
+
+                            byte[] serializedData = null;
+                            try {
+                                serializedData = 
serializer.serialize(multiTableCommittable);
+                            } catch (BufferOverflowException e) {
+                                e.printStackTrace();
+                                assert false : "Should not throw 
BufferOverflowException";
+                            } catch (IOException e) {
+                                e.printStackTrace();
+                                assert false : "IOException occurred";
                             }
+
+                            assertThat(serializedData).isNotNull();
                         });
     }
 }

Reply via email to