This is an automated email from the ASF dual-hosted git repository.
zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 224a17c0b [flink] supports kafka log system registration (#1693)
224a17c0b is described below
commit 224a17c0b3775b8584dbcc08d180abf6b61c1a8f
Author: legendtkl <[email protected]>
AuthorDate: Fri Aug 11 10:31:31 2023 +0800
[flink] supports kafka log system registration (#1693)
* [flink] supports kafka log system registration
* fix unstable test case
---
.../generated/flink_catalog_configuration.html | 6 +
.../generated/flink_connector_configuration.html | 12 ++
.../org/apache/paimon/catalog/AbstractCatalog.java | 2 +-
.../java/org/apache/paimon/flink/FlinkCatalog.java | 22 ++-
.../apache/paimon/flink/FlinkCatalogFactory.java | 5 +-
.../apache/paimon/flink/FlinkCatalogOptions.java | 9 +
.../apache/paimon/flink/FlinkConnectorOptions.java | 14 ++
.../paimon/flink/kafka/KafkaLogStoreFactory.java | 2 +-
.../paimon/flink/kafka/KafkaLogStoreRegister.java | 161 +++++++++++++++++
.../org/apache/paimon/flink/FlinkCatalogTest.java | 2 +-
.../org/apache/paimon/flink/LogSystemITCase.java | 135 +++++++++++++++
.../flink/kafka/KafkaLogStoreRegisterTest.java | 191 +++++++++++++++++++++
.../paimon/flink/kafka/KafkaTableTestBase.java | 38 +++-
13 files changed, 590 insertions(+), 9 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/flink_catalog_configuration.html
b/docs/layouts/shortcodes/generated/flink_catalog_configuration.html
index c1ed6dbb3..15d80eac7 100644
--- a/docs/layouts/shortcodes/generated/flink_catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_catalog_configuration.html
@@ -38,5 +38,11 @@ under the License.
<td>Boolean</td>
<td>If true, the register will automatically create and delete a
topic in log system for Paimon table. Default kafka log store register is
supported, users can implement customized register for log system, for example,
create a new class which extends KafkaLogStoreFactory and return a customized
LogStoreRegister for their kafka cluster to create/delete topics.</td>
</tr>
+ <tr>
+ <td><h5>log.system.auto-register-timeout</h5></td>
+ <td style="word-wrap: break-word;">1 min</td>
+ <td>Duration</td>
+ <td>The timeout for register to create or delete topic in log
system.</td>
+ </tr>
</tbody>
</table>
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index d72eee6f6..7c887dab6 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -50,6 +50,18 @@ under the License.
<td>Integer</td>
<td>The thread number for lookup async.</td>
</tr>
+ <tr>
+ <td><h5>log.system.partitions</h5></td>
+ <td style="word-wrap: break-word;">1</td>
+ <td>Integer</td>
+ <td>The number of partitions of the log system. If log system is
kafka, this is kafka partitions.</td>
+ </tr>
+ <tr>
+ <td><h5>log.system.replication</h5></td>
+ <td style="word-wrap: break-word;">1</td>
+ <td>Integer</td>
+ <td>The number of replication of the log system. If log system is
kafka, this is kafka replicationFactor.</td>
+ </tr>
<tr>
<td><h5>scan.infer-parallelism</h5></td>
<td style="word-wrap: break-word;">true</td>
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index f958adda0..102ef5a5d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -173,7 +173,7 @@ public abstract class AbstractCatalog implements Catalog {
}
}
- protected void copyTableDefaultOptions(Map<String, String> options) {
+ public void copyTableDefaultOptions(Map<String, String> options) {
tableDefaultOptions.forEach(options::putIfAbsent);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 51499abae..6554fddeb 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
@@ -70,6 +71,7 @@ import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -88,6 +90,8 @@ import static
org.apache.flink.table.descriptors.Schema.SCHEMA;
import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
import static
org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
import static org.apache.paimon.CoreOptions.PATH;
+import static
org.apache.paimon.flink.FlinkCatalogOptions.LOG_SYSTEM_AUTO_REGISTER;
+import static org.apache.paimon.flink.FlinkCatalogOptions.REGISTER_TIMEOUT;
import static org.apache.paimon.flink.LogicalTypeConversion.toDataType;
import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
import static org.apache.paimon.flink.log.LogStoreRegister.registerLogSystem;
@@ -107,16 +111,22 @@ public class FlinkCatalog extends AbstractCatalog {
private final Catalog catalog;
private final boolean logStoreAutoRegister;
+ private final Duration logStoreAutoRegisterTimeout;
+
+ private final Options options;
+
public FlinkCatalog(
Catalog catalog,
String name,
String defaultDatabase,
ClassLoader classLoader,
- boolean logStoreAutoRegister) {
+ Options options) {
super(name, defaultDatabase);
this.catalog = catalog;
this.classLoader = classLoader;
- this.logStoreAutoRegister = logStoreAutoRegister;
+ this.options = options;
+ this.logStoreAutoRegister = options.get(LOG_SYSTEM_AUTO_REGISTER);
+ this.logStoreAutoRegisterTimeout = options.get(REGISTER_TIMEOUT);
try {
this.catalog.createDatabase(defaultDatabase, true);
} catch (Catalog.DatabaseAlreadyExistException ignore) {
@@ -256,6 +266,14 @@ public class FlinkCatalog extends AbstractCatalog {
Identifier identifier = toIdentifier(tablePath);
if (logStoreAutoRegister) {
+ // Although catalog.createTable will copy the default options, but
we need this info
+ // here before create table, such as
table-default.kafka.bootstrap.servers defined in
+ // catalog options. Temporarily, we copy the default options here.
+ if (catalog instanceof org.apache.paimon.catalog.AbstractCatalog) {
+ ((org.apache.paimon.catalog.AbstractCatalog) catalog)
+ .copyTableDefaultOptions(options);
+ }
+ options.put(REGISTER_TIMEOUT.key(),
logStoreAutoRegisterTimeout.toString());
registerLogSystem(catalog, identifier, options, classLoader);
}
// remove table path
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogFactory.java
index ea42075c2..fe4f55cbe 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogFactory.java
@@ -27,7 +27,6 @@ import java.util.Collections;
import java.util.Set;
import static org.apache.paimon.flink.FlinkCatalogOptions.DEFAULT_DATABASE;
-import static
org.apache.paimon.flink.FlinkCatalogOptions.LOG_SYSTEM_AUTO_REGISTER;
/** Factory for {@link FlinkCatalog}. */
public class FlinkCatalogFactory implements
org.apache.flink.table.factories.CatalogFactory {
@@ -65,7 +64,7 @@ public class FlinkCatalogFactory implements
org.apache.flink.table.factories.Cat
catalogName,
context.options().get(DEFAULT_DATABASE),
classLoader,
- context.options().get(LOG_SYSTEM_AUTO_REGISTER));
+ context.options());
}
public static FlinkCatalog createCatalog(String catalogName, Catalog
catalog, Options options) {
@@ -74,7 +73,7 @@ public class FlinkCatalogFactory implements
org.apache.flink.table.factories.Cat
catalogName,
Catalog.DEFAULT_DATABASE,
FlinkCatalogFactory.class.getClassLoader(),
- options.get(LOG_SYSTEM_AUTO_REGISTER));
+ options);
}
public static Catalog createPaimonCatalog(Options catalogOptions) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogOptions.java
index fb0d1b692..97c1d5424 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogOptions.java
@@ -22,6 +22,8 @@ import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
+import java.time.Duration;
+
/** Options for flink catalog. */
public class FlinkCatalogOptions {
@@ -38,4 +40,11 @@ public class FlinkCatalogOptions {
"If true, the register will automatically create
and delete a topic in log system for Paimon table. Default kafka log store
register "
+ "is supported, users can implement
customized register for log system, for example, create a new class which
extends "
+ "KafkaLogStoreFactory and return a
customized LogStoreRegister for their kafka cluster to create/delete topics.");
+
+ public static final ConfigOption<Duration> REGISTER_TIMEOUT =
+ ConfigOptions.key("log.system.auto-register-timeout")
+ .durationType()
+ .defaultValue(Duration.ofMinutes(1))
+ .withDescription(
+ "The timeout for register to create or delete
topic in log system.");
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 682053602..de8485423 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -68,6 +68,20 @@ public class FlinkConnectorOptions {
+ "."))
.build());
+ public static final ConfigOption<Integer> LOG_SYSTEM_PARTITIONS =
+ ConfigOptions.key("log.system.partitions")
+ .intType()
+ .defaultValue(1)
+ .withDescription(
+ "The number of partitions of the log system. If
log system is kafka, this is kafka partitions.");
+
+ public static final ConfigOption<Integer> LOG_SYSTEM_REPLICATION =
+ ConfigOptions.key("log.system.replication")
+ .intType()
+ .defaultValue(1)
+ .withDescription(
+ "The number of replication of the log system. If
log system is kafka, this is kafka replicationFactor.");
+
public static final ConfigOption<Integer> SINK_PARALLELISM =
ConfigOptions.key("sink.parallelism")
.intType()
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
index 2a5038f5c..0966d166e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
@@ -132,7 +132,7 @@ public class KafkaLogStoreFactory implements
LogStoreTableFactory {
@Override
public LogStoreRegister createRegister(RegisterContext context) {
- throw new UnsupportedOperationException();
+ return new KafkaLogStoreRegister(context);
}
private int[] getPrimaryKeyIndexes(ResolvedSchema schema) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegister.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegister.java
new file mode 100644
index 000000000..af0740f1a
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegister.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.kafka;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.log.LogStoreRegister;
+import org.apache.paimon.flink.log.LogStoreTableFactory;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.flink.FlinkCatalogOptions.REGISTER_TIMEOUT;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM_PARTITIONS;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM_REPLICATION;
+import static org.apache.paimon.flink.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
+import static org.apache.paimon.flink.kafka.KafkaLogOptions.TOPIC;
+
+/** KafkaLogStoreRegister is used to register/unregister topics in Kafka for
paimon table. */
+public class KafkaLogStoreRegister implements LogStoreRegister {
+
+ private final String bootstrapServers;
+
+ private final String topic;
+
+ private final int partition;
+
+ private final int replicationFactor;
+
+ private final Duration timeout;
+
+ private final Properties properties;
+
+ private final Identifier identifier;
+
+ public KafkaLogStoreRegister(LogStoreTableFactory.RegisterContext context)
{
+ this.bootstrapServers = context.getOptions().get(BOOTSTRAP_SERVERS);
+ this.identifier = context.getIdentifier();
+ this.topic =
+ context.getOptions().getOptional(TOPIC).isPresent()
+ ? context.getOptions().get(TOPIC)
+ : String.format(
+ "%s_%s_%s",
+ this.identifier.getDatabaseName(),
+ this.identifier.getObjectName(),
+ UUID.randomUUID().toString().replace("-", ""));
+
+ Preconditions.checkArgument(this.bootstrapServers != null);
+ Preconditions.checkArgument(this.topic != null);
+ Preconditions.checkArgument(this.identifier != null);
+
+ // handle the type information missing when Map is converted to Options
+ if (context.getOptions().get(REGISTER_TIMEOUT.key()) == null) {
+ this.timeout = REGISTER_TIMEOUT.defaultValue();
+ } else {
+ this.timeout =
Duration.parse(context.getOptions().get(REGISTER_TIMEOUT.key()));
+ }
+
+ // handle bucket=-1
+ int bucketNum =
+ context.getOptions().get(BUCKET) == -1 ? 1 :
context.getOptions().get(BUCKET);
+ this.partition =
+
context.getOptions().getOptional(LOG_SYSTEM_PARTITIONS).isPresent()
+ ? context.getOptions().get(LOG_SYSTEM_PARTITIONS)
+ : bucketNum;
+
+ this.replicationFactor =
context.getOptions().get(LOG_SYSTEM_REPLICATION);
+
+ this.properties = new Properties();
+ this.properties.put("bootstrap.servers", this.bootstrapServers);
+ }
+
+ @Override
+ public Map<String, String> registerTopic() {
+ try (AdminClient admin = AdminClient.create(properties)) {
+ NewTopic newTopic =
+ new NewTopic(this.topic, this.partition, (short)
this.replicationFactor);
+
+ // Since the call is Async, let's wait for it to complete.
+ admin.createTopics(Collections.singleton(newTopic))
+ .all()
+ .get(this.timeout.getSeconds(), TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ throw new IllegalStateException(
+ String.format(
+ "Register topic for table %s timeout %s",
+ this.identifier.getFullName(), e.getMessage()));
+ } catch (Exception e) {
+ throw new IllegalStateException(
+ String.format(
+ "Register topic for table %s exception %s",
+ this.identifier.getFullName(), e.getMessage()));
+ }
+
+ return ImmutableMap.of(
+ TOPIC.key(),
+ this.topic,
+ LOG_SYSTEM_PARTITIONS.key(),
+ String.valueOf(this.partition),
+ LOG_SYSTEM_REPLICATION.key(),
+ String.valueOf(this.replicationFactor));
+ }
+
+ @Override
+ public void unRegisterTopic() {
+ try (AdminClient admin = AdminClient.create(properties)) {
+ admin.deleteTopics(Collections.singleton(this.topic))
+ .all()
+ .get(this.timeout.getSeconds(), TimeUnit.SECONDS);
+ } catch (ExecutionException e) {
+ if (e.getCause()
+ instanceof
org.apache.kafka.common.errors.UnknownTopicOrPartitionException) {
+ // ignore
+ return;
+ } else {
+ throw new IllegalStateException(
+ String.format(
+ "Unregister topic for table %s exception %s",
+ this.identifier.getFullName(),
e.getMessage()));
+ }
+ } catch (TimeoutException e) {
+ throw new RuntimeException(
+ String.format(
+ "Unregister topic for table %s timeout %s",
+ this.identifier.getFullName(), e.getMessage()));
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format(
+ "Unregister topic for table %s exception %s",
+ this.identifier.getFullName(), e.getMessage()));
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
index cae2f75f6..16b2b5f17 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
@@ -464,7 +464,7 @@ public class FlinkCatalogTest {
.field("comp", DataTypes.INT(), "test + 1")
.primaryKey("pk")
.build();
- CatalogTable catalogTable = new CatalogTableImpl(schema,
Collections.emptyMap(), "");
+ CatalogTable catalogTable = new CatalogTableImpl(schema, new
HashMap<>(), "");
catalog.createDatabase(path1.getDatabaseName(), null, false);
catalog.createTable(path1, catalogTable, false);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LogSystemITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LogSystemITCase.java
index 829d6744d..201f3db5f 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LogSystemITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LogSystemITCase.java
@@ -23,9 +23,13 @@ import org.apache.paimon.utils.BlockingIterator;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.types.Row;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.util.List;
@@ -186,4 +190,135 @@ public class LogSystemITCase extends KafkaTableTestBase {
.hasMessage(
"File store continuous reading does not support the
log streaming read mode.");
}
+
+ @Test
+ @Timeout(60)
+ public void testLogSystemAutoRegister() throws TableNotExistException {
+ // enable log system auto registration
+ tEnv.executeSql(
+ String.format(
+ "CREATE CATALOG PAIMON_REGISTER WITH ("
+ + "'type'='paimon', 'warehouse'='%s',
'log.system.auto-register'='true')",
+ getTempDirPath()));
+ tEnv.useCatalog("PAIMON_REGISTER");
+
+ env.getCheckpointConfig().disableCheckpointing();
+ env.setParallelism(1);
+
+ // check register table with specified bootstrap server and partition
num.
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE T (i INT, j INT) WITH ("
+ + "'log.system'='kafka', "
+ + "'log.system.partitions'='2', "
+ + "'write-mode'='append-only', "
+ + "'kafka.bootstrap.servers'='%s', "
+ + "'kafka.topic'='Tt')",
+ getBootstrapServers()));
+
+ checkTopicExists("Tt", 2, 1);
+
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE T2 (i INT, j INT) WITH ("
+ + "'log.system'='kafka', "
+ + "'bucket'='2', "
+ + "'write-mode'='append-only', "
+ + "'kafka.bootstrap.servers'='%s', "
+ + "'kafka.topic'='T2')",
+ getBootstrapServers()));
+
+ checkTopicExists("T2", 2, 1);
+
+ // check register a random kafka topic
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE T1 (i INT, j INT) WITH ("
+ + "'log.system'='kafka', "
+ + "'log.system.partitions'='2', "
+ + "'write-mode'='append-only', "
+ + "'kafka.bootstrap.servers'='%s')",
+ getBootstrapServers()));
+
+ CatalogBaseTable table =
+ tEnv.getCatalog("PAIMON_REGISTER")
+ .get()
+ .getTable(ObjectPath.fromString("default.T"));
+ checkTopicExists(table.getOptions().get("kafka.topic"), 2, 1);
+
+ // check unregister topic when creating table fail
+ assertThatThrownBy(
+ () ->
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE T (i INT, j INT)
WITH ("
+ +
"'log.system'='kafka', "
+ +
"'log.system.partitions'='2', "
+ +
"'write-mode'='append-only', "
+ +
"'kafka.bootstrap.servers'='%s', "
+ +
"'kafka.topic'='T1')",
+ getBootstrapServers())))
+
.isInstanceOf(org.apache.flink.table.api.ValidationException.class)
+ .hasMessage("Could not execute CreateTable in path
`PAIMON_REGISTER`.`default`.`T`")
+ .cause()
+ .isInstanceOf(
+
org.apache.flink.table.catalog.exceptions.TableAlreadyExistException.class)
+ .hasMessage("Table (or view) default.T already exists in
Catalog PAIMON_REGISTER.");
+
+ checkTopicNotExist("T1");
+
+ // tEnv.useDatabase("NOT_EXIST");
+ assertThatThrownBy(
+ () ->
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE NOT_EXIST.T (i
INT, j INT) WITH ("
+ +
"'log.system'='kafka', "
+ +
"'log.system.partitions'='2', "
+ +
"'write-mode'='append-only', "
+ +
"'kafka.bootstrap.servers'='%s', "
+ +
"'kafka.topic'='T1')",
+ getBootstrapServers())))
+
.isInstanceOf(org.apache.flink.table.api.ValidationException.class)
+ .hasMessage(
+ "Could not execute CreateTable in path
`PAIMON_REGISTER`.`NOT_EXIST`.`T`")
+ .cause()
+ .isInstanceOf(
+
org.apache.flink.table.catalog.exceptions.DatabaseNotExistException.class)
+ .hasMessage("Database NOT_EXIST does not exist in Catalog
PAIMON_REGISTER.");
+
+ checkTopicNotExist("T1");
+
+ // check unregister topic when drop table
+ tEnv.executeSql("DROP TABLE T");
+ checkTopicNotExist("T");
+ }
+
+ @Test
+ @Timeout(60)
+ public void testLogSystemAutoRegisterWithDefaultOption() {
+ // enable log system auto registration
+ tEnv.executeSql(
+ String.format(
+ "CREATE CATALOG PAIMON_DEFAULT WITH ("
+ + "'type'='paimon', 'warehouse'='%s', "
+ + "'log.system.auto-register'='true', "
+ +
"'table-default.kafka.bootstrap.servers'='%s',"
+ + "'table-default.log.system.partitions'='2')",
+ getTempDirPath(), getBootstrapServers()));
+ tEnv.useCatalog("PAIMON_DEFAULT");
+
+ env.getCheckpointConfig().disableCheckpointing();
+ env.setParallelism(1);
+
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE T (i INT, j INT) WITH ("
+ + "'log.system'='kafka', "
+ + "'write-mode'='append-only', "
+ + "'kafka.topic'='T')",
+ getBootstrapServers()));
+
+ checkTopicExists("T", 2, 1);
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterTest.java
new file mode 100644
index 000000000..4db0d4111
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.kafka;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.log.LogStoreTableFactory;
+import org.apache.paimon.options.Options;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.junit.After;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.paimon.flink.FlinkCatalogOptions.REGISTER_TIMEOUT;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM_PARTITIONS;
+import static org.apache.paimon.flink.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
+import static org.apache.paimon.flink.kafka.KafkaLogOptions.TOPIC;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Fail.fail;
+
+/** Tests for {@link KafkaLogStoreRegister}. */
+public class KafkaLogStoreRegisterTest extends KafkaTableTestBase {
+ private static final String DATABASE = "mock_db";
+
+ private static final String TABLE = "mock_table";
+
+ @After
+ public void tearDown() {
+ // clean up all the topics
+ try (AdminClient admin = createAdminClient()) {
+ Set<String> topics = admin.listTopics().names().get();
+ admin.deleteTopics(topics).all().get();
+ } catch (Exception ignored) {
+ // ignored
+ }
+ }
+
+ @Test
+ public void testRegisterTopic() {
+ String topic = "register-topic";
+
+ Map<String, String> result =
+ createKafkaLogStoreRegister(getBootstrapServers(), topic,
2).registerTopic();
+ checkTopicExists(topic, 2, 1);
+ assertThat(result.get(TOPIC.key())).isEqualTo(topic);
+ }
+
+ @Test
+ public void testRegisterTopicAuto() {
+ Map<String, String> result =
+
createKafkaLogStoreRegister(getBootstrapServers()).registerTopic();
+
+ try (AdminClient admin = createAdminClient()) {
+ Set<String> topics = admin.listTopics().names().get(5,
TimeUnit.SECONDS);
+ assertThat(topics.size()).isEqualTo(1);
+
+ String topicName = topics.stream().findFirst().get();
+ assertThat(result.get(TOPIC.key())).isEqualTo(topicName);
+
+ String preFix = String.format("%s_%s_", DATABASE, TABLE);
+ assertThat(topicName).startsWith(preFix);
+
+ String uuid = topicName.substring(preFix.length());
+ assertThat(uuid).matches("[0-9a-fA-F]{32}");
+
+ // assert use bucket count when log.system.partitions is missed.
+ assertThat(result.get(LOG_SYSTEM_PARTITIONS.key())).isEqualTo("1");
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testRegisterTopicException() {
+ String topic = "register-topic";
+ String invalidBootstrapServers = "invalid-bootstrap-servers:9092";
+
+ KafkaLogStoreRegister kafkaLogStoreRegister =
+ createKafkaLogStoreRegister(invalidBootstrapServers, topic);
+
+ assertThatThrownBy(kafkaLogStoreRegister::registerTopic)
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining(
+ "Register topic for table mock_db.mock_table exception
Failed to create new KafkaAdminClient");
+ }
+
+ @Test
+ public void testRegisterTopicExist() {
+ String topic = "topic-exist";
+ createTopic(topic, 1, 1);
+
+ assertThatThrownBy(
+ () ->
+
createKafkaLogStoreRegister(getBootstrapServers(), topic)
+ .registerTopic())
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage(
+ "Register topic for table mock_db.mock_table exception
org.apache.kafka.common.errors.TopicExistsException: Topic 'topic-exist'
already exists.");
+ }
+
+ @Test
+ public void testUnregisterTopic() {
+ String topic = "unregister-topic";
+ createTopic(topic, 2, 1);
+
+ createKafkaLogStoreRegister(getBootstrapServers(), topic,
2).unRegisterTopic();
+ checkTopicNotExist(topic);
+ }
+
+ @Test
+ public void testUnregisterTopicException() {
+ String topic = "not_exist_topic";
+
+ assertThatCode(
+ () ->
+
createKafkaLogStoreRegister(getBootstrapServers(), topic)
+ .unRegisterTopic())
+ .doesNotThrowAnyException();
+ }
+
+ private KafkaLogStoreRegister createKafkaLogStoreRegister(String
bootstrapServers) {
+ return createKafkaLogStoreRegister(bootstrapServers, null, null);
+ }
+
+ private KafkaLogStoreRegister createKafkaLogStoreRegister(
+ String bootstrapServers, String topic) {
+ return createKafkaLogStoreRegister(bootstrapServers, topic, null);
+ }
+
+ private KafkaLogStoreRegister createKafkaLogStoreRegister(
+ String bootstrapServers, String topic, Integer partition) {
+ Options tableOptions = new Options();
+ tableOptions.set(BOOTSTRAP_SERVERS, bootstrapServers);
+
+ if (topic != null) {
+ tableOptions.set(KafkaLogOptions.TOPIC, topic);
+ }
+
+ if (partition != null) {
+ tableOptions.set(LOG_SYSTEM_PARTITIONS, partition);
+ }
+ tableOptions.set(REGISTER_TIMEOUT.key(),
Duration.ofSeconds(20).toString());
+
+ return new KafkaLogStoreRegister(
+ new LogStoreTableFactory.RegisterContext() {
+ @Override
+ public Options getOptions() {
+ return tableOptions;
+ }
+
+ @Override
+ public Identifier getIdentifier() {
+ return Identifier.create(DATABASE, TABLE);
+ }
+ });
+ }
+
+ private void createTopic(String topic, int partition, int
replicationFactor) {
+ try (AdminClient admin = createAdminClient()) {
+ admin.createTopics(
+ Collections.singletonList(
+ new NewTopic(topic, partition, (short)
replicationFactor)));
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java
index 2c50dc902..fdcb99f33 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java
@@ -26,6 +26,7 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.util.DockerImageVersions;
import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.TopicListing;
@@ -58,8 +59,12 @@ import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Fail.fail;
+
/** Base class for Kafka Table IT Cases. */
public abstract class KafkaTableTestBase extends AbstractTestBase {
@@ -253,7 +258,7 @@ public abstract class KafkaTableTestBase extends
AbstractTestBase {
}
/** Kafka container extension for junit5. */
- private static class KafkaContainerExtension extends KafkaContainer
+ protected static class KafkaContainerExtension extends KafkaContainer
implements BeforeAllCallback, AfterAllCallback {
private KafkaContainerExtension(DockerImageName dockerImageName) {
super(dockerImageName);
@@ -269,4 +274,35 @@ public abstract class KafkaTableTestBase extends
AbstractTestBase {
this.close();
}
}
+
+ // ------------------------ For Kafka Test Purpose
----------------------------------
+ protected void checkTopicExists(String topic, int partition, int
replicationFactor) {
+ try (AdminClient admin = createAdminClient()) {
+ DescribeTopicsResult topicDesc =
admin.describeTopics(Collections.singleton(topic));
+ TopicDescription description =
+ topicDesc.allTopicNames().get(10,
TimeUnit.SECONDS).get(topic);
+
+ assertThat(description.partitions().size()).isEqualTo(partition);
+ assertThat(description.partitions().get(0).replicas().size())
+ .isEqualTo(replicationFactor);
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+
+ protected void checkTopicNotExist(String topic) {
+ try (AdminClient admin = createAdminClient()) {
+
assertThat(admin.describeTopics(Collections.emptyList()).allTopicNames().get())
+ .doesNotContainKey(topic);
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+
+ protected AdminClient createAdminClient() {
+ Properties properties = new Properties();
+ properties.put("bootstrap.servers", getBootstrapServers());
+
+ return AdminClient.create(properties);
+ }
}