This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.6 in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit 9a196e0fa34a99fcca30464dc51daa637cc60025 Author: liming.1018 <[email protected]> AuthorDate: Wed Dec 6 23:00:17 2023 +0800 [flink] supports writing to multiple partitions of kafka in unaware bucket mode. (#2459) --- .../apache/paimon/table/sink/TableWriteImpl.java | 9 +++- .../flink/kafka/KafkaLogSerializationSchema.java | 3 +- .../flink/kafka/KafkaLogSerializationTest.java | 15 ++++++- .../apache/paimon/flink/kafka/LogSystemITCase.java | 52 ++++++++++++++++++++++ .../paimon/flink/sink/StoreSinkWriteImpl.java | 3 +- 5 files changed, 77 insertions(+), 5 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java index 2bea3c224..25c29be9f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java @@ -29,6 +29,7 @@ import org.apache.paimon.memory.MemorySegmentPool; import org.apache.paimon.metrics.MetricRegistry; import org.apache.paimon.operation.FileStoreWrite; import org.apache.paimon.operation.FileStoreWrite.State; +import org.apache.paimon.table.BucketMode; import org.apache.paimon.utils.Restorable; import java.util.List; @@ -48,6 +49,7 @@ public class TableWriteImpl<T> implements InnerTableWrite, Restorable<List<State private final RecordExtractor<T> recordExtractor; private boolean batchCommitted = false; + private BucketMode bucketMode; public TableWriteImpl( FileStoreWrite<T> write, @@ -92,6 +94,11 @@ public class TableWriteImpl<T> implements InnerTableWrite, Restorable<List<State return this; } + public TableWriteImpl<T> withBucketMode(BucketMode bucketMode) { + this.bucketMode = bucketMode; + return this; + } + @Override public BinaryRow getPartition(InternalRow row) { keyAndBucketExtractor.setRecord(row); @@ -136,7 +143,7 @@ public class TableWriteImpl<T> implements InnerTableWrite, Restorable<List<State keyAndBucketExtractor.setRecord(record.row()); return new SinkRecord( record.partition(), - record.bucket(), + bucketMode == BucketMode.UNAWARE ? -1 : record.bucket(), keyAndBucketExtractor.logPrimaryKey(), record.row()); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSerializationSchema.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSerializationSchema.java index bb4c8645f..1fd80d356 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSerializationSchema.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSerializationSchema.java @@ -81,6 +81,7 @@ public class KafkaLogSerializationSchema implements KafkaSerializationSchema<Sin } else { valueBytes = valueSerializer.serialize(new FlinkRowData(element.row())); } - return new ProducerRecord<>(topic, element.bucket(), primaryKeyBytes, valueBytes); + Integer partition = element.bucket() < 0 ? null : element.bucket(); + return new ProducerRecord<>(topic, partition, primaryKeyBytes, valueBytes); } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogSerializationTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogSerializationTest.java index 1e64f4e68..9d8e2295a 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogSerializationTest.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogSerializationTest.java @@ -63,6 +63,12 @@ public class KafkaLogSerializationTest { checkNonKeyed(LogChangelogMode.ALL, 2, 5, 3); } + @Test + public void testUnawareBucket() throws Exception { + checkNonKeyed(LogChangelogMode.AUTO, -1, 3, 5); + checkNonKeyed(LogChangelogMode.ALL, -1, 5, 3); + } + private void checkKeyed(LogChangelogMode mode, int bucket, int key, int value) throws Exception { check(mode, true, bucket, key, value, RowKind.INSERT); @@ -92,7 +98,11 @@ public class KafkaLogSerializationTest { SinkRecord input = testRecord(keyed, bucket, key, value, rowKind); ProducerRecord<byte[], byte[]> record = serializer.serialize(input, null); - assertThat(record.partition().intValue()).isEqualTo(bucket); + if (bucket >= 0) { + assertThat(record.partition().intValue()).isEqualTo(bucket); + } else { + assertThat(record.partition()).isNull(); + } AtomicReference<RowData> rowReference = new AtomicReference<>(); deserializer.deserialize( @@ -129,7 +139,8 @@ public class KafkaLogSerializationTest { } private ConsumerRecord<byte[], byte[]> toConsumerRecord(ProducerRecord<byte[], byte[]> record) { - return new ConsumerRecord<>(TOPIC, record.partition(), 0, record.key(), record.value()); + int partition = record.partition() == null ? -1 : record.partition(); + return new ConsumerRecord<>(TOPIC, partition, 0, record.key(), record.value()); } private static KafkaLogSerializationSchema createTestSerializationSchema( diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/LogSystemITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/LogSystemITCase.java index a32e7b54c..d4a64ea54 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/LogSystemITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/LogSystemITCase.java @@ -26,12 +26,18 @@ import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.exceptions.TableNotExistException; import org.apache.flink.types.Row; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import static org.assertj.core.api.Assertions.assertThat; @@ -370,4 +376,50 @@ public class LogSystemITCase extends KafkaTableTestBase { deleteTopicIfExists(topic); } } + + @Test + @Timeout(120) + public void testAppendOnlyWithUnawareBucket() throws Exception { + String topic = UUID.randomUUID().toString(); + createTopicIfNotExists(topic, 2); + + try { + // disable checkpointing to test eventual + env.getCheckpointConfig().disableCheckpointing(); + env.setParallelism(1); + tEnv.executeSql( + String.format( + "CREATE TABLE T (i INT, j INT) WITH (" + + "'log.system'='kafka', " + + "'log.consistency'='eventual', " + + "'bucket'='-1', " + + "'kafka.bootstrap.servers'='%s', " + + "'kafka.topic'='%s'," + + "'kafka.batch.size'='20')", + getBootstrapServers(), topic)); + tEnv.executeSql( + "CREATE TEMPORARY TABLE gen (i INT, j INT) WITH ('connector'='datagen', 'rows-per-second'='2')"); + TableResult write = tEnv.executeSql("INSERT INTO T SELECT * FROM gen"); + BlockingIterator<Row, Row> read = + BlockingIterator.of(tEnv.executeSql("SELECT * FROM T").collect()); + List<Row> collect = read.collect(10); + assertThat(collect).hasSize(10); + write.getJobClient().get().cancel(); + read.close(); + + // check offsets + try (final AdminClient adminClient = AdminClient.create(getStandardProps())) { + Map<TopicPartition, OffsetSpec> topicPartitionOffsets = new HashMap<>(4); + for (int i = 0; i < 2; i++) { + topicPartitionOffsets.put(new TopicPartition(topic, i), OffsetSpec.latest()); + } + Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> result = + adminClient.listOffsets(topicPartitionOffsets).all().get(); + assertThat(result.values()) + .allMatch(partitionOffsetInfo -> partitionOffsetInfo.offset() > 0); + } + } finally { + deleteTopicIfExists(topic); + } + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java index ceffa7648..968ace78a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java @@ -146,7 +146,8 @@ public class StoreSinkWriteImpl implements StoreSinkWrite { state.stateValueFilter().filter(table.name(), part, bucket)) .withIOManager(paimonIOManager) .withIgnorePreviousFiles(ignorePreviousFiles) - .withExecutionMode(isStreamingMode); + .withExecutionMode(isStreamingMode) + .withBucketMode(table.bucketMode()); if (metricGroup != null) { tableWrite.withMetricRegistry(new FlinkMetricRegistry(metricGroup));
