This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2ee67f38a [cdc] Optimize MultiTableCommittableSerializer
2ee67f38a is described below
commit 2ee67f38a5ce4350c94dcfbd64614ebab132f15d
Author: Jingsong <[email protected]>
AuthorDate: Sat May 4 16:11:01 2024 +0800
[cdc] Optimize MultiTableCommittableSerializer
---
.../sink/MultiTableCommittableSerializer.java | 16 ++--
.../sink/MultiTableCommittableSerializerTest.java | 97 ++++++++++------------
2 files changed, 51 insertions(+), 62 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializer.java
index 6b1d69c4b..2b427db4a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializer.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializer.java
@@ -53,21 +53,19 @@ public class MultiTableCommittableSerializer
@Override
public byte[] serialize(MultiTableCommittable committable) throws
IOException {
// first serialize all metadata
- String database = committable.getDatabase();
- int databaseLen = database.getBytes(StandardCharsets.UTF_8).length;
- String table = committable.getTable();
- int tableLen = table.getBytes(StandardCharsets.UTF_8).length;
+ byte[] database =
committable.getDatabase().getBytes(StandardCharsets.UTF_8);
+ byte[] table = committable.getTable().getBytes(StandardCharsets.UTF_8);
- int multiTableMetaLen = databaseLen + tableLen + 2 * 4;
+ int multiTableMetaLen = database.length + table.length + 2 * 4;
// use committable serializer (of the same version) to serialize
committable
byte[] serializedCommittable = serializeCommittable(committable);
return ByteBuffer.allocate(multiTableMetaLen +
serializedCommittable.length)
- .putInt(databaseLen)
- .put(database.getBytes())
- .putInt(tableLen)
- .put(table.getBytes())
+ .putInt(database.length)
+ .put(database)
+ .putInt(table.length)
+ .put(table)
.put(serializedCommittable)
.array();
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializerTest.java
index 904294cf7..16c1a7d04 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializerTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializerTest.java
@@ -26,96 +26,87 @@ import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.sink.CommitMessageSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.nio.BufferOverflowException;
-import java.util.function.Consumer;
+import java.util.Arrays;
import static
org.apache.paimon.manifest.ManifestCommittableSerializerTest.randomCompactIncrement;
import static
org.apache.paimon.manifest.ManifestCommittableSerializerTest.randomNewFilesIncrement;
import static
org.apache.paimon.mergetree.compact.MergeTreeCompactManagerTest.row;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.Assert.assertNotNull;
class MultiTableCommittableSerializerTest {
+
private final CommitMessageSerializer fileSerializer = new
CommitMessageSerializer();
private final MultiTableCommittableSerializer serializer =
new MultiTableCommittableSerializer(fileSerializer);
@Test
- public void testDeserialize() throws IOException {
+ public void testDeserialize() {
DataIncrement dataIncrement = randomNewFilesIncrement();
CompactIncrement compactIncrement = randomCompactIncrement();
CommitMessage commitMessage =
new CommitMessageImpl(row(0), 1, dataIncrement,
compactIncrement);
Committable committable = new Committable(9, Committable.Kind.FILE,
commitMessage);
- Lists.newArrayList(Tuple2.of("测试数据库", "用户信息表"), Tuple2.of("database",
"table")).stream()
+ Arrays.asList(Tuple2.of("测试数据库", "用户信息表"), Tuple2.of("database",
"table"))
.forEach(
- new Consumer<Tuple2<String, String>>() {
- @Override
- public void accept(Tuple2<String, String>
stringStringTuple2) {
- String database = stringStringTuple2.f0;
- String table = stringStringTuple2.f1;
- MultiTableCommittable multiTableCommittable =
- MultiTableCommittable.fromCommittable(
- Identifier.create(database,
table), committable);
- MultiTableCommittable deserializeCommittable =
null;
- try {
- deserializeCommittable =
- serializer.deserialize(
- 2,
serializer.serialize(multiTableCommittable));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- assertThat(deserializeCommittable)
-
.isInstanceOf(MultiTableCommittable.class);
-
-
assertThat(deserializeCommittable.getDatabase())
- .isEqualTo(database);
-
assertThat(deserializeCommittable.getTable()).isEqualTo(table);
+ tuple2 -> {
+ String database = tuple2.f0;
+ String table = tuple2.f1;
+ MultiTableCommittable multiTableCommittable =
+ MultiTableCommittable.fromCommittable(
+ Identifier.create(database,
table), committable);
+ MultiTableCommittable deserializeCommittable;
+ try {
+ deserializeCommittable =
+ serializer.deserialize(
+ 2,
serializer.serialize(multiTableCommittable));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
+
+ assertThat(deserializeCommittable)
+ .isInstanceOf(MultiTableCommittable.class);
+
+
assertThat(deserializeCommittable.getDatabase()).isEqualTo(database);
+
assertThat(deserializeCommittable.getTable()).isEqualTo(table);
});
}
@Test
- public void testSerialize() throws IOException {
+ public void testSerialize() {
DataIncrement newFilesIncrement = randomNewFilesIncrement();
CompactIncrement compactIncrement = randomCompactIncrement();
CommitMessage commitMessage =
new CommitMessageImpl(row(0), 1, newFilesIncrement,
compactIncrement);
Committable committable = new Committable(9, Committable.Kind.FILE,
commitMessage);
- Lists.newArrayList(Tuple2.of("测试数据库", "用户信息表"), Tuple2.of("database",
"table")).stream()
+ Arrays.asList(Tuple2.of("测试数据库", "用户信息表"), Tuple2.of("database",
"table"))
.forEach(
- new Consumer<Tuple2<String, String>>() {
- @Override
- public void accept(Tuple2<String, String>
stringStringTuple2) {
- String database = stringStringTuple2.f0;
- String table = stringStringTuple2.f1;
-
- MultiTableCommittable multiTableCommittable =
- MultiTableCommittable.fromCommittable(
- Identifier.create(database,
table), committable);
-
- byte[] serializedData = null;
- try {
- serializedData =
serializer.serialize(multiTableCommittable);
- } catch (BufferOverflowException e) {
- e.printStackTrace();
- assert false : "Should not throw
BufferOverflowException";
- } catch (IOException e) {
- e.printStackTrace();
- assert false : "IOException occurred";
- }
-
- assertNotNull(
- "The serialized data should not be
null.", serializedData);
+ tuple2 -> {
+ String database = tuple2.f0;
+ String table = tuple2.f1;
+
+ MultiTableCommittable multiTableCommittable =
+ MultiTableCommittable.fromCommittable(
+ Identifier.create(database,
table), committable);
+
+ byte[] serializedData = null;
+ try {
+ serializedData =
serializer.serialize(multiTableCommittable);
+ } catch (BufferOverflowException e) {
+ e.printStackTrace();
+ assert false : "Should not throw
BufferOverflowException";
+ } catch (IOException e) {
+ e.printStackTrace();
+ assert false : "IOException occurred";
}
+
+ assertThat(serializedData).isNotNull();
});
}
}