This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3d9aba3a0 [kafka] support to explicitly create the kafka topic. (#910)
3d9aba3a0 is described below
commit 3d9aba3a044acaab09e147a02a8029a4817c016b
Author: calvin <[email protected]>
AuthorDate: Fri Apr 14 16:59:46 2023 +0800
[kafka] support to explicitly create the kafka topic. (#910)
---
.../paimon/flink/kafka/KafkaLogSinkProvider.java | 38 +++++++++++++++++++++-
.../paimon/flink/kafka/KafkaLogStoreFactory.java | 4 ++-
.../paimon/flink/FileSystemCatalogITCase.java | 6 ++--
3 files changed, 43 insertions(+), 5 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSinkProvider.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSinkProvider.java
index 09c9c823f..2d055f6a4 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSinkProvider.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSinkProvider.java
@@ -26,10 +26,15 @@ import org.apache.paimon.flink.sink.LogSinkFunction;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic;
+import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.errors.TopicExistsException;
import javax.annotation.Nullable;
+import java.util.Collections;
import java.util.Properties;
/** A Kafka {@link LogSinkProvider}. */
@@ -49,19 +54,25 @@ public class KafkaLogSinkProvider implements
LogSinkProvider {
private final LogChangelogMode changelogMode;
+ private final Integer numBuckets;
+
+ public static final int DEFAULT_REPLICATION_FACTOR = 2;
+
public KafkaLogSinkProvider(
String topic,
Properties properties,
@Nullable SerializationSchema<RowData> primaryKeySerializer,
SerializationSchema<RowData> valueSerializer,
LogConsistency consistency,
- LogChangelogMode changelogMode) {
+ LogChangelogMode changelogMode,
+ Integer numBuckets) {
this.topic = topic;
this.properties = properties;
this.primaryKeySerializer = primaryKeySerializer;
this.valueSerializer = valueSerializer;
this.consistency = consistency;
this.changelogMode = changelogMode;
+ this.numBuckets = numBuckets;
}
@Override
@@ -77,6 +88,7 @@ public class KafkaLogSinkProvider implements LogSinkProvider {
default:
throw new IllegalArgumentException("Unsupported: " +
consistency);
}
+ createTopicIfNotExists();
return new KafkaSinkFunction(topic, createSerializationSchema(),
properties, semantic);
}
@@ -85,4 +97,28 @@ public class KafkaLogSinkProvider implements LogSinkProvider
{
return new KafkaLogSerializationSchema(
topic, primaryKeySerializer, valueSerializer, changelogMode);
}
+
+ private void createTopicIfNotExists() {
+ try (final AdminClient adminClient = AdminClient.create(properties)) {
+ if (!adminClient.listTopics().names().get().contains(topic)) {
+ int numBrokers =
adminClient.describeCluster().nodes().get().size();
+ int replicationFactor =
+ DEFAULT_REPLICATION_FACTOR > numBrokers
+ ? numBrokers
+ : DEFAULT_REPLICATION_FACTOR;
+
+ NewTopic newTopic = new NewTopic(topic, numBuckets, (short)
replicationFactor);
+
+
adminClient.createTopics(Collections.singleton(newTopic)).all().get();
+ }
+ } catch (Exception e) {
+ if (e.getCause() instanceof TopicExistsException) {
+ throw new TableException(
+ String.format(
+ "Failed to create kafka topic. " + "Reason:
topic %s exists. ",
+ topic));
+ }
+ throw new TableException("Error in createTopicIfNotExists", e);
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
index 9bb9e0f3a..8ea9e6c47 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
@@ -43,6 +43,7 @@ import java.util.Map;
import java.util.Properties;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG;
+import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
import static org.apache.paimon.CoreOptions.LOG_CONSISTENCY;
import static org.apache.paimon.CoreOptions.LogConsistency;
@@ -124,7 +125,8 @@ public class KafkaLogStoreFactory implements
LogStoreTableFactory {
primaryKeySerializer,
valueSerializer,
options.get(LOG_CONSISTENCY),
- options.get(LOG_CHANGELOG_MODE));
+ options.get(LOG_CHANGELOG_MODE),
+ options.get(BUCKET));
}
private int[] getPrimaryKeyIndexes(ResolvedSchema schema) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
index 202faea9f..2d7add076 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
@@ -97,7 +97,6 @@ public class FileSystemCatalogITCase extends
KafkaTableTestBase {
@Test
public void testLogWriteRead() throws Exception {
String topic = UUID.randomUUID().toString();
- createTopicIfNotExists(topic, 1);
try {
tEnv.useCatalog("fs");
@@ -135,9 +134,10 @@ public class FileSystemCatalogITCase extends
KafkaTableTestBase {
+ "'kafka.topic'='%s'"
+ ")",
getBootstrapServers(), topic));
+
+ tEnv.executeSql("INSERT INTO T VALUES ('1', '2', '3'), ('4', '5',
'6')").await();
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(tEnv.from("T").execute().collect());
- tEnv.executeSql("INSERT INTO T VALUES ('1', '2', '3'), ('4', '5',
'6')").await();
List<Row> result = iterator.collectAndClose(2);
assertThat(result)
.containsExactlyInAnyOrder(Row.of("1", "2", "3", 4),
Row.of("4", "5", "6", 7));
@@ -191,9 +191,9 @@ public class FileSystemCatalogITCase extends
KafkaTableTestBase {
}
private void innerTestWriteRead() throws Exception {
+ tEnv.executeSql("INSERT INTO T VALUES ('1', '2', '3'), ('4', '5',
'6')").await();
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(tEnv.from("T").execute().collect());
- tEnv.executeSql("INSERT INTO T VALUES ('1', '2', '3'), ('4', '5',
'6')").await();
List<Row> result = iterator.collectAndClose(2);
assertThat(result).containsExactlyInAnyOrder(Row.of("1", "2", "3"),
Row.of("4", "5", "6"));
}