This is an automated email from the ASF dual-hosted git repository.

zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 224a17c0b [flink] supports kafka log system registration (#1693)
224a17c0b is described below

commit 224a17c0b3775b8584dbcc08d180abf6b61c1a8f
Author: legendtkl <[email protected]>
AuthorDate: Fri Aug 11 10:31:31 2023 +0800

    [flink] supports kafka log system registration (#1693)
    
    * [flink] supports kafka log system registration
    
    * fix unstable test case
---
 .../generated/flink_catalog_configuration.html     |   6 +
 .../generated/flink_connector_configuration.html   |  12 ++
 .../org/apache/paimon/catalog/AbstractCatalog.java |   2 +-
 .../java/org/apache/paimon/flink/FlinkCatalog.java |  22 ++-
 .../apache/paimon/flink/FlinkCatalogFactory.java   |   5 +-
 .../apache/paimon/flink/FlinkCatalogOptions.java   |   9 +
 .../apache/paimon/flink/FlinkConnectorOptions.java |  14 ++
 .../paimon/flink/kafka/KafkaLogStoreFactory.java   |   2 +-
 .../paimon/flink/kafka/KafkaLogStoreRegister.java  | 161 +++++++++++++++++
 .../org/apache/paimon/flink/FlinkCatalogTest.java  |   2 +-
 .../org/apache/paimon/flink/LogSystemITCase.java   | 135 +++++++++++++++
 .../flink/kafka/KafkaLogStoreRegisterTest.java     | 191 +++++++++++++++++++++
 .../paimon/flink/kafka/KafkaTableTestBase.java     |  38 +++-
 13 files changed, 590 insertions(+), 9 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/flink_catalog_configuration.html 
b/docs/layouts/shortcodes/generated/flink_catalog_configuration.html
index c1ed6dbb3..15d80eac7 100644
--- a/docs/layouts/shortcodes/generated/flink_catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_catalog_configuration.html
@@ -38,5 +38,11 @@ under the License.
             <td>Boolean</td>
             <td>If true, the register will automatically create and delete a 
topic in log system for Paimon table. Default kafka log store register is 
supported, users can implement customized register for log system, for example, 
create a new class which extends KafkaLogStoreFactory and return a customized 
LogStoreRegister for their kafka cluster to create/delete topics.</td>
         </tr>
+        <tr>
+            <td><h5>log.system.auto-register-timeout</h5></td>
+            <td style="word-wrap: break-word;">1 min</td>
+            <td>Duration</td>
+            <td>The timeout for register to create or delete topic in log 
system.</td>
+        </tr>
     </tbody>
 </table>
diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index d72eee6f6..7c887dab6 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -50,6 +50,18 @@ under the License.
             <td>Integer</td>
             <td>The thread number for lookup async.</td>
         </tr>
+        <tr>
+            <td><h5>log.system.partitions</h5></td>
+            <td style="word-wrap: break-word;">1</td>
+            <td>Integer</td>
+            <td>The number of partitions of the log system. If log system is 
kafka, this is kafka partitions.</td>
+        </tr>
+        <tr>
+            <td><h5>log.system.replication</h5></td>
+            <td style="word-wrap: break-word;">1</td>
+            <td>Integer</td>
+            <td>The number of replication of the log system. If log system is 
kafka, this is kafka replicationFactor.</td>
+        </tr>
         <tr>
             <td><h5>scan.infer-parallelism</h5></td>
             <td style="word-wrap: break-word;">true</td>
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index f958adda0..102ef5a5d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -173,7 +173,7 @@ public abstract class AbstractCatalog implements Catalog {
         }
     }
 
-    protected void copyTableDefaultOptions(Map<String, String> options) {
+    public void copyTableDefaultOptions(Map<String, String> options) {
         tableDefaultOptions.forEach(options::putIfAbsent);
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 51499abae..6554fddeb 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink;
 
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
@@ -70,6 +71,7 @@ import org.apache.flink.table.factories.Factory;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -88,6 +90,8 @@ import static 
org.apache.flink.table.descriptors.Schema.SCHEMA;
 import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
 import static 
org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
 import static org.apache.paimon.CoreOptions.PATH;
+import static 
org.apache.paimon.flink.FlinkCatalogOptions.LOG_SYSTEM_AUTO_REGISTER;
+import static org.apache.paimon.flink.FlinkCatalogOptions.REGISTER_TIMEOUT;
 import static org.apache.paimon.flink.LogicalTypeConversion.toDataType;
 import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
 import static org.apache.paimon.flink.log.LogStoreRegister.registerLogSystem;
@@ -107,16 +111,22 @@ public class FlinkCatalog extends AbstractCatalog {
     private final Catalog catalog;
     private final boolean logStoreAutoRegister;
 
+    private final Duration logStoreAutoRegisterTimeout;
+
+    private final Options options;
+
     public FlinkCatalog(
             Catalog catalog,
             String name,
             String defaultDatabase,
             ClassLoader classLoader,
-            boolean logStoreAutoRegister) {
+            Options options) {
         super(name, defaultDatabase);
         this.catalog = catalog;
         this.classLoader = classLoader;
-        this.logStoreAutoRegister = logStoreAutoRegister;
+        this.options = options;
+        this.logStoreAutoRegister = options.get(LOG_SYSTEM_AUTO_REGISTER);
+        this.logStoreAutoRegisterTimeout = options.get(REGISTER_TIMEOUT);
         try {
             this.catalog.createDatabase(defaultDatabase, true);
         } catch (Catalog.DatabaseAlreadyExistException ignore) {
@@ -256,6 +266,14 @@ public class FlinkCatalog extends AbstractCatalog {
 
         Identifier identifier = toIdentifier(tablePath);
         if (logStoreAutoRegister) {
+            // Although catalog.createTable will copy the default options, but 
we need this info
+            // here before create table, such as 
table-default.kafka.bootstrap.servers defined in
+            // catalog options. Temporarily, we copy the default options here.
+            if (catalog instanceof org.apache.paimon.catalog.AbstractCatalog) {
+                ((org.apache.paimon.catalog.AbstractCatalog) catalog)
+                        .copyTableDefaultOptions(options);
+            }
+            options.put(REGISTER_TIMEOUT.key(), 
logStoreAutoRegisterTimeout.toString());
             registerLogSystem(catalog, identifier, options, classLoader);
         }
         // remove table path
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogFactory.java
index ea42075c2..fe4f55cbe 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogFactory.java
@@ -27,7 +27,6 @@ import java.util.Collections;
 import java.util.Set;
 
 import static org.apache.paimon.flink.FlinkCatalogOptions.DEFAULT_DATABASE;
-import static 
org.apache.paimon.flink.FlinkCatalogOptions.LOG_SYSTEM_AUTO_REGISTER;
 
 /** Factory for {@link FlinkCatalog}. */
 public class FlinkCatalogFactory implements 
org.apache.flink.table.factories.CatalogFactory {
@@ -65,7 +64,7 @@ public class FlinkCatalogFactory implements 
org.apache.flink.table.factories.Cat
                 catalogName,
                 context.options().get(DEFAULT_DATABASE),
                 classLoader,
-                context.options().get(LOG_SYSTEM_AUTO_REGISTER));
+                context.options());
     }
 
     public static FlinkCatalog createCatalog(String catalogName, Catalog 
catalog, Options options) {
@@ -74,7 +73,7 @@ public class FlinkCatalogFactory implements 
org.apache.flink.table.factories.Cat
                 catalogName,
                 Catalog.DEFAULT_DATABASE,
                 FlinkCatalogFactory.class.getClassLoader(),
-                options.get(LOG_SYSTEM_AUTO_REGISTER));
+                options);
     }
 
     public static Catalog createPaimonCatalog(Options catalogOptions) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogOptions.java
index fb0d1b692..97c1d5424 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogOptions.java
@@ -22,6 +22,8 @@ import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.options.ConfigOption;
 import org.apache.paimon.options.ConfigOptions;
 
+import java.time.Duration;
+
 /** Options for flink catalog. */
 public class FlinkCatalogOptions {
 
@@ -38,4 +40,11 @@ public class FlinkCatalogOptions {
                             "If true, the register will automatically create 
and delete a topic in log system for Paimon table. Default kafka log store 
register "
                                     + "is supported, users can implement 
customized register for log system, for example, create a new class which 
extends "
                                     + "KafkaLogStoreFactory and return a 
customized LogStoreRegister for their kafka cluster to create/delete topics.");
+
+    public static final ConfigOption<Duration> REGISTER_TIMEOUT =
+            ConfigOptions.key("log.system.auto-register-timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofMinutes(1))
+                    .withDescription(
+                            "The timeout for register to create or delete 
topic in log system.");
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 682053602..de8485423 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -68,6 +68,20 @@ public class FlinkConnectorOptions {
                                                             + "."))
                                     .build());
 
+    public static final ConfigOption<Integer> LOG_SYSTEM_PARTITIONS =
+            ConfigOptions.key("log.system.partitions")
+                    .intType()
+                    .defaultValue(1)
+                    .withDescription(
+                            "The number of partitions of the log system. If 
log system is kafka, this is kafka partitions.");
+
+    public static final ConfigOption<Integer> LOG_SYSTEM_REPLICATION =
+            ConfigOptions.key("log.system.replication")
+                    .intType()
+                    .defaultValue(1)
+                    .withDescription(
+                            "The number of replication of the log system. If 
log system is kafka, this is kafka replicationFactor.");
+
     public static final ConfigOption<Integer> SINK_PARALLELISM =
             ConfigOptions.key("sink.parallelism")
                     .intType()
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
index 2a5038f5c..0966d166e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
@@ -132,7 +132,7 @@ public class KafkaLogStoreFactory implements 
LogStoreTableFactory {
 
     @Override
     public LogStoreRegister createRegister(RegisterContext context) {
-        throw new UnsupportedOperationException();
+        return new KafkaLogStoreRegister(context);
     }
 
     private int[] getPrimaryKeyIndexes(ResolvedSchema schema) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegister.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegister.java
new file mode 100644
index 000000000..af0740f1a
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegister.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.kafka;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.log.LogStoreRegister;
+import org.apache.paimon.flink.log.LogStoreTableFactory;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.flink.FlinkCatalogOptions.REGISTER_TIMEOUT;
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM_PARTITIONS;
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM_REPLICATION;
+import static org.apache.paimon.flink.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
+import static org.apache.paimon.flink.kafka.KafkaLogOptions.TOPIC;
+
+/** KafkaLogStoreRegister is used to register/unregister topics in Kafka for 
paimon table. */
+public class KafkaLogStoreRegister implements LogStoreRegister {
+
+    private final String bootstrapServers;
+
+    private final String topic;
+
+    private final int partition;
+
+    private final int replicationFactor;
+
+    private final Duration timeout;
+
+    private final Properties properties;
+
+    private final Identifier identifier;
+
+    public KafkaLogStoreRegister(LogStoreTableFactory.RegisterContext context) 
{
+        this.bootstrapServers = context.getOptions().get(BOOTSTRAP_SERVERS);
+        this.identifier = context.getIdentifier();
+        this.topic =
+                context.getOptions().getOptional(TOPIC).isPresent()
+                        ? context.getOptions().get(TOPIC)
+                        : String.format(
+                                "%s_%s_%s",
+                                this.identifier.getDatabaseName(),
+                                this.identifier.getObjectName(),
+                                UUID.randomUUID().toString().replace("-", ""));
+
+        Preconditions.checkArgument(this.bootstrapServers != null);
+        Preconditions.checkArgument(this.topic != null);
+        Preconditions.checkArgument(this.identifier != null);
+
+        // handle the type information missing when Map is converted to Options
+        if (context.getOptions().get(REGISTER_TIMEOUT.key()) == null) {
+            this.timeout = REGISTER_TIMEOUT.defaultValue();
+        } else {
+            this.timeout = 
Duration.parse(context.getOptions().get(REGISTER_TIMEOUT.key()));
+        }
+
+        // handle bucket=-1
+        int bucketNum =
+                context.getOptions().get(BUCKET) == -1 ? 1 : 
context.getOptions().get(BUCKET);
+        this.partition =
+                
context.getOptions().getOptional(LOG_SYSTEM_PARTITIONS).isPresent()
+                        ? context.getOptions().get(LOG_SYSTEM_PARTITIONS)
+                        : bucketNum;
+
+        this.replicationFactor = 
context.getOptions().get(LOG_SYSTEM_REPLICATION);
+
+        this.properties = new Properties();
+        this.properties.put("bootstrap.servers", this.bootstrapServers);
+    }
+
+    @Override
+    public Map<String, String> registerTopic() {
+        try (AdminClient admin = AdminClient.create(properties)) {
+            NewTopic newTopic =
+                    new NewTopic(this.topic, this.partition, (short) 
this.replicationFactor);
+
+            // Since the call is Async, let's wait for it to complete.
+            admin.createTopics(Collections.singleton(newTopic))
+                    .all()
+                    .get(this.timeout.getSeconds(), TimeUnit.SECONDS);
+        } catch (TimeoutException e) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Register topic for table %s timeout %s",
+                            this.identifier.getFullName(), e.getMessage()));
+        } catch (Exception e) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Register topic for table %s exception %s",
+                            this.identifier.getFullName(), e.getMessage()));
+        }
+
+        return ImmutableMap.of(
+                TOPIC.key(),
+                this.topic,
+                LOG_SYSTEM_PARTITIONS.key(),
+                String.valueOf(this.partition),
+                LOG_SYSTEM_REPLICATION.key(),
+                String.valueOf(this.replicationFactor));
+    }
+
+    @Override
+    public void unRegisterTopic() {
+        try (AdminClient admin = AdminClient.create(properties)) {
+            admin.deleteTopics(Collections.singleton(this.topic))
+                    .all()
+                    .get(this.timeout.getSeconds(), TimeUnit.SECONDS);
+        } catch (ExecutionException e) {
+            if (e.getCause()
+                    instanceof 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException) {
+                // ignore
+                return;
+            } else {
+                throw new IllegalStateException(
+                        String.format(
+                                "Unregister topic for table %s exception %s",
+                                this.identifier.getFullName(), 
e.getMessage()));
+            }
+        } catch (TimeoutException e) {
+            throw new RuntimeException(
+                    String.format(
+                            "Unregister topic for table %s timeout %s",
+                            this.identifier.getFullName(), e.getMessage()));
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    String.format(
+                            "Unregister topic for table %s exception %s",
+                            this.identifier.getFullName(), e.getMessage()));
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
index cae2f75f6..16b2b5f17 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
@@ -464,7 +464,7 @@ public class FlinkCatalogTest {
                         .field("comp", DataTypes.INT(), "test + 1")
                         .primaryKey("pk")
                         .build();
-        CatalogTable catalogTable = new CatalogTableImpl(schema, 
Collections.emptyMap(), "");
+        CatalogTable catalogTable = new CatalogTableImpl(schema, new 
HashMap<>(), "");
 
         catalog.createDatabase(path1.getDatabaseName(), null, false);
         catalog.createTable(path1, catalogTable, false);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LogSystemITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LogSystemITCase.java
index 829d6744d..201f3db5f 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LogSystemITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LogSystemITCase.java
@@ -23,9 +23,13 @@ import org.apache.paimon.utils.BlockingIterator;
 
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.types.Row;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 
 import java.io.IOException;
 import java.util.List;
@@ -186,4 +190,135 @@ public class LogSystemITCase extends KafkaTableTestBase {
                 .hasMessage(
                         "File store continuous reading does not support the 
log streaming read mode.");
     }
+
+    @Test
+    @Timeout(60)
+    public void testLogSystemAutoRegister() throws TableNotExistException {
+        // enable log system auto registration
+        tEnv.executeSql(
+                String.format(
+                        "CREATE CATALOG PAIMON_REGISTER WITH ("
+                                + "'type'='paimon', 'warehouse'='%s', 
'log.system.auto-register'='true')",
+                        getTempDirPath()));
+        tEnv.useCatalog("PAIMON_REGISTER");
+
+        env.getCheckpointConfig().disableCheckpointing();
+        env.setParallelism(1);
+
+        // check register table with specified bootstrap server and partition 
num.
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE T (i INT, j INT) WITH ("
+                                + "'log.system'='kafka', "
+                                + "'log.system.partitions'='2', "
+                                + "'write-mode'='append-only', "
+                                + "'kafka.bootstrap.servers'='%s', "
+                                + "'kafka.topic'='Tt')",
+                        getBootstrapServers()));
+
+        checkTopicExists("Tt", 2, 1);
+
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE T2 (i INT, j INT) WITH ("
+                                + "'log.system'='kafka', "
+                                + "'bucket'='2', "
+                                + "'write-mode'='append-only', "
+                                + "'kafka.bootstrap.servers'='%s', "
+                                + "'kafka.topic'='T2')",
+                        getBootstrapServers()));
+
+        checkTopicExists("T2", 2, 1);
+
+        // check register a random kafka topic
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE T1 (i INT, j INT) WITH ("
+                                + "'log.system'='kafka', "
+                                + "'log.system.partitions'='2', "
+                                + "'write-mode'='append-only', "
+                                + "'kafka.bootstrap.servers'='%s')",
+                        getBootstrapServers()));
+
+        CatalogBaseTable table =
+                tEnv.getCatalog("PAIMON_REGISTER")
+                        .get()
+                        .getTable(ObjectPath.fromString("default.T"));
+        checkTopicExists(table.getOptions().get("kafka.topic"), 2, 1);
+
+        // check unregister topic when creating table fail
+        assertThatThrownBy(
+                        () ->
+                                tEnv.executeSql(
+                                        String.format(
+                                                "CREATE TABLE T (i INT, j INT) 
WITH ("
+                                                        + 
"'log.system'='kafka', "
+                                                        + 
"'log.system.partitions'='2', "
+                                                        + 
"'write-mode'='append-only', "
+                                                        + 
"'kafka.bootstrap.servers'='%s', "
+                                                        + 
"'kafka.topic'='T1')",
+                                                getBootstrapServers())))
+                
.isInstanceOf(org.apache.flink.table.api.ValidationException.class)
+                .hasMessage("Could not execute CreateTable in path 
`PAIMON_REGISTER`.`default`.`T`")
+                .cause()
+                .isInstanceOf(
+                        
org.apache.flink.table.catalog.exceptions.TableAlreadyExistException.class)
+                .hasMessage("Table (or view) default.T already exists in 
Catalog PAIMON_REGISTER.");
+
+        checkTopicNotExist("T1");
+
+        // tEnv.useDatabase("NOT_EXIST");
+        assertThatThrownBy(
+                        () ->
+                                tEnv.executeSql(
+                                        String.format(
+                                                "CREATE TABLE NOT_EXIST.T (i 
INT, j INT) WITH ("
+                                                        + 
"'log.system'='kafka', "
+                                                        + 
"'log.system.partitions'='2', "
+                                                        + 
"'write-mode'='append-only', "
+                                                        + 
"'kafka.bootstrap.servers'='%s', "
+                                                        + 
"'kafka.topic'='T1')",
+                                                getBootstrapServers())))
+                
.isInstanceOf(org.apache.flink.table.api.ValidationException.class)
+                .hasMessage(
+                        "Could not execute CreateTable in path 
`PAIMON_REGISTER`.`NOT_EXIST`.`T`")
+                .cause()
+                .isInstanceOf(
+                        
org.apache.flink.table.catalog.exceptions.DatabaseNotExistException.class)
+                .hasMessage("Database NOT_EXIST does not exist in Catalog 
PAIMON_REGISTER.");
+
+        checkTopicNotExist("T1");
+
+        // check unregister topic when drop table
+        tEnv.executeSql("DROP TABLE T");
+        checkTopicNotExist("T");
+    }
+
+    @Test
+    @Timeout(60)
+    public void testLogSystemAutoRegisterWithDefaultOption() {
+        // enable log system auto registration
+        tEnv.executeSql(
+                String.format(
+                        "CREATE CATALOG PAIMON_DEFAULT WITH ("
+                                + "'type'='paimon', 'warehouse'='%s', "
+                                + "'log.system.auto-register'='true', "
+                                + 
"'table-default.kafka.bootstrap.servers'='%s',"
+                                + "'table-default.log.system.partitions'='2')",
+                        getTempDirPath(), getBootstrapServers()));
+        tEnv.useCatalog("PAIMON_DEFAULT");
+
+        env.getCheckpointConfig().disableCheckpointing();
+        env.setParallelism(1);
+
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE T (i INT, j INT) WITH ("
+                                + "'log.system'='kafka', "
+                                + "'write-mode'='append-only', "
+                                + "'kafka.topic'='T')",
+                        getBootstrapServers()));
+
+        checkTopicExists("T", 2, 1);
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterTest.java
new file mode 100644
index 000000000..4db0d4111
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.kafka;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.log.LogStoreTableFactory;
+import org.apache.paimon.options.Options;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.junit.After;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.paimon.flink.FlinkCatalogOptions.REGISTER_TIMEOUT;
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM_PARTITIONS;
+import static org.apache.paimon.flink.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
+import static org.apache.paimon.flink.kafka.KafkaLogOptions.TOPIC;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Fail.fail;
+
+/** Tests for {@link KafkaLogStoreRegister}. */
+public class KafkaLogStoreRegisterTest extends KafkaTableTestBase {
+    private static final String DATABASE = "mock_db";
+
+    private static final String TABLE = "mock_table";
+
+    @After
+    public void tearDown() {
+        // clean up all the topics
+        try (AdminClient admin = createAdminClient()) {
+            Set<String> topics = admin.listTopics().names().get();
+            admin.deleteTopics(topics).all().get();
+        } catch (Exception ignored) {
+            // ignored
+        }
+    }
+
+    @Test
+    public void testRegisterTopic() {
+        String topic = "register-topic";
+
+        Map<String, String> result =
+                createKafkaLogStoreRegister(getBootstrapServers(), topic, 
2).registerTopic();
+        checkTopicExists(topic, 2, 1);
+        assertThat(result.get(TOPIC.key())).isEqualTo(topic);
+    }
+
+    @Test
+    public void testRegisterTopicAuto() {
+        Map<String, String> result =
+                
createKafkaLogStoreRegister(getBootstrapServers()).registerTopic();
+
+        try (AdminClient admin = createAdminClient()) {
+            Set<String> topics = admin.listTopics().names().get(5, 
TimeUnit.SECONDS);
+            assertThat(topics.size()).isEqualTo(1);
+
+            String topicName = topics.stream().findFirst().get();
+            assertThat(result.get(TOPIC.key())).isEqualTo(topicName);
+
+            String preFix = String.format("%s_%s_", DATABASE, TABLE);
+            assertThat(topicName).startsWith(preFix);
+
+            String uuid = topicName.substring(preFix.length());
+            assertThat(uuid).matches("[0-9a-fA-F]{32}");
+
+            // assert use bucket count when log.system.partitions is missed.
+            assertThat(result.get(LOG_SYSTEM_PARTITIONS.key())).isEqualTo("1");
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testRegisterTopicException() {
+        String topic = "register-topic";
+        String invalidBootstrapServers = "invalid-bootstrap-servers:9092";
+
+        KafkaLogStoreRegister kafkaLogStoreRegister =
+                createKafkaLogStoreRegister(invalidBootstrapServers, topic);
+
+        assertThatThrownBy(kafkaLogStoreRegister::registerTopic)
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining(
+                        "Register topic for table mock_db.mock_table exception 
Failed to create new KafkaAdminClient");
+    }
+
+    @Test
+    public void testRegisterTopicExist() {
+        String topic = "topic-exist";
+        createTopic(topic, 1, 1);
+
+        assertThatThrownBy(
+                        () ->
+                                
createKafkaLogStoreRegister(getBootstrapServers(), topic)
+                                        .registerTopic())
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessage(
+                        "Register topic for table mock_db.mock_table exception 
org.apache.kafka.common.errors.TopicExistsException: Topic 'topic-exist' 
already exists.");
+    }
+
+    @Test
+    public void testUnregisterTopic() {
+        String topic = "unregister-topic";
+        createTopic(topic, 2, 1);
+
+        createKafkaLogStoreRegister(getBootstrapServers(), topic, 
2).unRegisterTopic();
+        checkTopicNotExist(topic);
+    }
+
+    @Test
+    public void testUnregisterTopicException() {
+        String topic = "not_exist_topic";
+
+        assertThatCode(
+                        () ->
+                                
createKafkaLogStoreRegister(getBootstrapServers(), topic)
+                                        .unRegisterTopic())
+                .doesNotThrowAnyException();
+    }
+
+    private KafkaLogStoreRegister createKafkaLogStoreRegister(String 
bootstrapServers) {
+        return createKafkaLogStoreRegister(bootstrapServers, null, null);
+    }
+
+    private KafkaLogStoreRegister createKafkaLogStoreRegister(
+            String bootstrapServers, String topic) {
+        return createKafkaLogStoreRegister(bootstrapServers, topic, null);
+    }
+
+    private KafkaLogStoreRegister createKafkaLogStoreRegister(
+            String bootstrapServers, String topic, Integer partition) {
+        Options tableOptions = new Options();
+        tableOptions.set(BOOTSTRAP_SERVERS, bootstrapServers);
+
+        if (topic != null) {
+            tableOptions.set(KafkaLogOptions.TOPIC, topic);
+        }
+
+        if (partition != null) {
+            tableOptions.set(LOG_SYSTEM_PARTITIONS, partition);
+        }
+        tableOptions.set(REGISTER_TIMEOUT.key(), 
Duration.ofSeconds(20).toString());
+
+        return new KafkaLogStoreRegister(
+                new LogStoreTableFactory.RegisterContext() {
+                    @Override
+                    public Options getOptions() {
+                        return tableOptions;
+                    }
+
+                    @Override
+                    public Identifier getIdentifier() {
+                        return Identifier.create(DATABASE, TABLE);
+                    }
+                });
+    }
+
+    private void createTopic(String topic, int partition, int 
replicationFactor) {
+        try (AdminClient admin = createAdminClient()) {
+            admin.createTopics(
+                    Collections.singletonList(
+                            new NewTopic(topic, partition, (short) 
replicationFactor)));
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java
index 2c50dc902..fdcb99f33 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.util.DockerImageVersions;
 import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.admin.TopicListing;
@@ -58,8 +59,12 @@ import java.util.Properties;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Fail.fail;
+
 /** Base class for Kafka Table IT Cases. */
 public abstract class KafkaTableTestBase extends AbstractTestBase {
 
@@ -253,7 +258,7 @@ public abstract class KafkaTableTestBase extends 
AbstractTestBase {
     }
 
     /** Kafka container extension for junit5. */
-    private static class KafkaContainerExtension extends KafkaContainer
+    protected static class KafkaContainerExtension extends KafkaContainer
             implements BeforeAllCallback, AfterAllCallback {
         private KafkaContainerExtension(DockerImageName dockerImageName) {
             super(dockerImageName);
@@ -269,4 +274,35 @@ public abstract class KafkaTableTestBase extends 
AbstractTestBase {
             this.close();
         }
     }
+
+    // ------------------------ For Kafka Test Purpose 
----------------------------------
+    protected void checkTopicExists(String topic, int partition, int 
replicationFactor) {
+        try (AdminClient admin = createAdminClient()) {
+            DescribeTopicsResult topicDesc = 
admin.describeTopics(Collections.singleton(topic));
+            TopicDescription description =
+                    topicDesc.allTopicNames().get(10, 
TimeUnit.SECONDS).get(topic);
+
+            assertThat(description.partitions().size()).isEqualTo(partition);
+            assertThat(description.partitions().get(0).replicas().size())
+                    .isEqualTo(replicationFactor);
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+    }
+
+    protected void checkTopicNotExist(String topic) {
+        try (AdminClient admin = createAdminClient()) {
+            
assertThat(admin.describeTopics(Collections.emptyList()).allTopicNames().get())
+                    .doesNotContainKey(topic);
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+    }
+
+    protected AdminClient createAdminClient() {
+        Properties properties = new Properties();
+        properties.put("bootstrap.servers", getBootstrapServers());
+
+        return AdminClient.create(properties);
+    }
 }


Reply via email to