This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.6
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit 9a196e0fa34a99fcca30464dc51daa637cc60025
Author: liming.1018 <[email protected]>
AuthorDate: Wed Dec 6 23:00:17 2023 +0800

    [flink] supports writing to multiple partitions of kafka in unaware bucket 
mode. (#2459)
---
 .../apache/paimon/table/sink/TableWriteImpl.java   |  9 +++-
 .../flink/kafka/KafkaLogSerializationSchema.java   |  3 +-
 .../flink/kafka/KafkaLogSerializationTest.java     | 15 ++++++-
 .../apache/paimon/flink/kafka/LogSystemITCase.java | 52 ++++++++++++++++++++++
 .../paimon/flink/sink/StoreSinkWriteImpl.java      |  3 +-
 5 files changed, 77 insertions(+), 5 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index 2bea3c224..25c29be9f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -29,6 +29,7 @@ import org.apache.paimon.memory.MemorySegmentPool;
 import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.operation.FileStoreWrite;
 import org.apache.paimon.operation.FileStoreWrite.State;
+import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.utils.Restorable;
 
 import java.util.List;
@@ -48,6 +49,7 @@ public class TableWriteImpl<T> implements InnerTableWrite, 
Restorable<List<State
     private final RecordExtractor<T> recordExtractor;
 
     private boolean batchCommitted = false;
+    private BucketMode bucketMode;
 
     public TableWriteImpl(
             FileStoreWrite<T> write,
@@ -92,6 +94,11 @@ public class TableWriteImpl<T> implements InnerTableWrite, 
Restorable<List<State
         return this;
     }
 
+    public TableWriteImpl<T> withBucketMode(BucketMode bucketMode) {
+        this.bucketMode = bucketMode;
+        return this;
+    }
+
     @Override
     public BinaryRow getPartition(InternalRow row) {
         keyAndBucketExtractor.setRecord(row);
@@ -136,7 +143,7 @@ public class TableWriteImpl<T> implements InnerTableWrite, 
Restorable<List<State
         keyAndBucketExtractor.setRecord(record.row());
         return new SinkRecord(
                 record.partition(),
-                record.bucket(),
+                bucketMode == BucketMode.UNAWARE ? -1 : record.bucket(),
                 keyAndBucketExtractor.logPrimaryKey(),
                 record.row());
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSerializationSchema.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSerializationSchema.java
index bb4c8645f..1fd80d356 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSerializationSchema.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSerializationSchema.java
@@ -81,6 +81,7 @@ public class KafkaLogSerializationSchema implements 
KafkaSerializationSchema<Sin
         } else {
             valueBytes = valueSerializer.serialize(new 
FlinkRowData(element.row()));
         }
-        return new ProducerRecord<>(topic, element.bucket(), primaryKeyBytes, 
valueBytes);
+        Integer partition = element.bucket() < 0 ? null : element.bucket();
+        return new ProducerRecord<>(topic, partition, primaryKeyBytes, 
valueBytes);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogSerializationTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogSerializationTest.java
index 1e64f4e68..9d8e2295a 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogSerializationTest.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogSerializationTest.java
@@ -63,6 +63,12 @@ public class KafkaLogSerializationTest {
         checkNonKeyed(LogChangelogMode.ALL, 2, 5, 3);
     }
 
+    @Test
+    public void testUnawareBucket() throws Exception {
+        checkNonKeyed(LogChangelogMode.AUTO, -1, 3, 5);
+        checkNonKeyed(LogChangelogMode.ALL, -1, 5, 3);
+    }
+
     private void checkKeyed(LogChangelogMode mode, int bucket, int key, int 
value)
             throws Exception {
         check(mode, true, bucket, key, value, RowKind.INSERT);
@@ -92,7 +98,11 @@ public class KafkaLogSerializationTest {
         SinkRecord input = testRecord(keyed, bucket, key, value, rowKind);
         ProducerRecord<byte[], byte[]> record = serializer.serialize(input, 
null);
 
-        assertThat(record.partition().intValue()).isEqualTo(bucket);
+        if (bucket >= 0) {
+            assertThat(record.partition().intValue()).isEqualTo(bucket);
+        } else {
+            assertThat(record.partition()).isNull();
+        }
 
         AtomicReference<RowData> rowReference = new AtomicReference<>();
         deserializer.deserialize(
@@ -129,7 +139,8 @@ public class KafkaLogSerializationTest {
     }
 
     private ConsumerRecord<byte[], byte[]> 
toConsumerRecord(ProducerRecord<byte[], byte[]> record) {
-        return new ConsumerRecord<>(TOPIC, record.partition(), 0, 
record.key(), record.value());
+        int partition = record.partition() == null ? -1 : record.partition();
+        return new ConsumerRecord<>(TOPIC, partition, 0, record.key(), 
record.value());
     }
 
     private static KafkaLogSerializationSchema createTestSerializationSchema(
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/LogSystemITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/LogSystemITCase.java
index a32e7b54c..d4a64ea54 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/LogSystemITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/LogSystemITCase.java
@@ -26,12 +26,18 @@ import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.types.Row;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.common.TopicPartition;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -370,4 +376,50 @@ public class LogSystemITCase extends KafkaTableTestBase {
             deleteTopicIfExists(topic);
         }
     }
+
+    @Test
+    @Timeout(120)
+    public void testAppendOnlyWithUnawareBucket() throws Exception {
+        String topic = UUID.randomUUID().toString();
+        createTopicIfNotExists(topic, 2);
+
+        try {
+            // disable checkpointing to test eventual
+            env.getCheckpointConfig().disableCheckpointing();
+            env.setParallelism(1);
+            tEnv.executeSql(
+                    String.format(
+                            "CREATE TABLE T (i INT, j INT) WITH ("
+                                    + "'log.system'='kafka', "
+                                    + "'log.consistency'='eventual', "
+                                    + "'bucket'='-1', "
+                                    + "'kafka.bootstrap.servers'='%s', "
+                                    + "'kafka.topic'='%s',"
+                                    + "'kafka.batch.size'='20')",
+                            getBootstrapServers(), topic));
+            tEnv.executeSql(
+                    "CREATE TEMPORARY TABLE gen (i INT, j INT) WITH 
('connector'='datagen', 'rows-per-second'='2')");
+            TableResult write = tEnv.executeSql("INSERT INTO T SELECT * FROM 
gen");
+            BlockingIterator<Row, Row> read =
+                    BlockingIterator.of(tEnv.executeSql("SELECT * FROM 
T").collect());
+            List<Row> collect = read.collect(10);
+            assertThat(collect).hasSize(10);
+            write.getJobClient().get().cancel();
+            read.close();
+
+            // check offsets
+            try (final AdminClient adminClient = 
AdminClient.create(getStandardProps())) {
+                Map<TopicPartition, OffsetSpec> topicPartitionOffsets = new 
HashMap<>(4);
+                for (int i = 0; i < 2; i++) {
+                    topicPartitionOffsets.put(new TopicPartition(topic, i), 
OffsetSpec.latest());
+                }
+                Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> 
result =
+                        
adminClient.listOffsets(topicPartitionOffsets).all().get();
+                assertThat(result.values())
+                        .allMatch(partitionOffsetInfo -> 
partitionOffsetInfo.offset() > 0);
+            }
+        } finally {
+            deleteTopicIfExists(topic);
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index ceffa7648..968ace78a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -146,7 +146,8 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
                                         
state.stateValueFilter().filter(table.name(), part, bucket))
                         .withIOManager(paimonIOManager)
                         .withIgnorePreviousFiles(ignorePreviousFiles)
-                        .withExecutionMode(isStreamingMode);
+                        .withExecutionMode(isStreamingMode)
+                        .withBucketMode(table.bucketMode());
 
         if (metricGroup != null) {
             tableWrite.withMetricRegistry(new 
FlinkMetricRegistry(metricGroup));

Reply via email to