This is an automated email from the ASF dual-hosted git repository.

zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 63927fa10 [flink][logsystem] Introduce log store register interfaces 
(#1548)
63927fa10 is described below

commit 63927fa1051a9f4e4aade79e921489e7dd280ede
Author: Shammon FY <[email protected]>
AuthorDate: Tue Jul 25 11:47:51 2023 +0800

    [flink][logsystem] Introduce log store register interfaces (#1548)
    
    * [flink][logsystem] Introduce log store register interfaces
    
    * [flink][logsystem] Lazy check table exists
    
    * [flink][logsystem] Add auto-register option and merge log store register 
in LogStoreTableFactory
    
    * [flink][logsystem] fix docs
    
    * [flink][logsystem] Update register interface for options
    
    * [flink][logsystem] Add auto register config in flink catalog options
    
    * [flink][logsystem] Rebase from master
    
    * [flink][logsystem] Add flink catalog options in configurations.md
---
 docs/content/maintenance/configurations.md         |  6 ++
 .../generated/flink_catalog_configuration.html     | 42 ++++++++++
 .../java/org/apache/paimon/flink/FlinkCatalog.java | 44 +++++++++--
 .../apache/paimon/flink/FlinkCatalogFactory.java   | 23 +++---
 .../apache/paimon/flink/FlinkCatalogOptions.java   | 41 ++++++++++
 .../org/apache/paimon/flink/action/ActionBase.java |  2 +-
 .../paimon/flink/kafka/KafkaLogStoreFactory.java   |  6 ++
 .../apache/paimon/flink/log/LogStoreRegister.java  | 85 ++++++++++++++++++++
 .../paimon/flink/log/LogStoreTableFactory.java     | 17 ++++
 .../org/apache/paimon/flink/FlinkCatalogTest.java  | 90 ++++++++++++++++++++++
 .../services/org.apache.paimon.factories.Factory   |  2 +
 11 files changed, 339 insertions(+), 19 deletions(-)

diff --git a/docs/content/maintenance/configurations.md 
b/docs/content/maintenance/configurations.md
index e34a33260..5d46dbfaa 100644
--- a/docs/content/maintenance/configurations.md
+++ b/docs/content/maintenance/configurations.md
@@ -44,6 +44,12 @@ Options for Hive catalog.
 
 {{< generated/hive_catalog_configuration >}}
 
+### FlinkCatalogOptions
+
+Flink catalog options for paimon.
+
+{{< generated/flink_catalog_configuration >}}
+
 ### FlinkConnectorOptions
 
 Flink connector options for paimon.
diff --git a/docs/layouts/shortcodes/generated/flink_catalog_configuration.html 
b/docs/layouts/shortcodes/generated/flink_catalog_configuration.html
new file mode 100644
index 000000000..c1ed6dbb3
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/flink_catalog_configuration.html
@@ -0,0 +1,42 @@
+{{/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/}}
+<table class="configuration table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default</th>
+            <th class="text-left" style="width: 10%">Type</th>
+            <th class="text-left" style="width: 55%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>default-database</h5></td>
+            <td style="word-wrap: break-word;">"default"</td>
+            <td>String</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td><h5>log.system.auto-register</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>If true, the register will automatically create and delete a 
topic in log system for Paimon table. Default kafka log store register is 
supported, users can implement customized register for log system, for example, 
create a new class which extends KafkaLogStoreFactory and return a customized 
LogStoreRegister for their kafka cluster to create/delete topics.</td>
+        </tr>
+    </tbody>
+</table>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index a629112c1..8a2224da0 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -25,7 +25,6 @@ import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.utils.Preconditions;
 
 import org.apache.flink.table.api.TableColumn;
 import org.apache.flink.table.api.TableSchema;
@@ -91,21 +90,33 @@ import static 
org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDa
 import static org.apache.paimon.CoreOptions.PATH;
 import static org.apache.paimon.flink.LogicalTypeConversion.toDataType;
 import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
+import static org.apache.paimon.flink.log.LogStoreRegister.registerLogSystem;
+import static org.apache.paimon.flink.log.LogStoreRegister.unRegisterLogSystem;
 import static 
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.compoundKey;
 import static 
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn;
 import static 
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeWatermarkSpec;
 import static 
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.nonPhysicalColumnsCount;
 import static 
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.serializeNonPhysicalColumns;
 import static 
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.serializeWatermarkSpec;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Catalog for paimon. */
 public class FlinkCatalog extends AbstractCatalog {
+    private final ClassLoader classLoader;
 
     private final Catalog catalog;
-
-    public FlinkCatalog(Catalog catalog, String name, String defaultDatabase) {
+    private final boolean logStoreAutoRegister;
+
+    public FlinkCatalog(
+            Catalog catalog,
+            String name,
+            String defaultDatabase,
+            ClassLoader classLoader,
+            boolean logStoreAutoRegister) {
         super(name, defaultDatabase);
         this.catalog = catalog;
+        this.classLoader = classLoader;
+        this.logStoreAutoRegister = logStoreAutoRegister;
         try {
             this.catalog.createDatabase(defaultDatabase, true);
         } catch (Catalog.DatabaseAlreadyExistException ignore) {
@@ -210,8 +221,16 @@ public class FlinkCatalog extends AbstractCatalog {
     @Override
     public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
             throws TableNotExistException, CatalogException {
+        Identifier identifier = toIdentifier(tablePath);
+        Table table = null;
         try {
+            if (logStoreAutoRegister && catalog.tableExists(identifier)) {
+                table = catalog.getTable(identifier);
+            }
             catalog.dropTable(toIdentifier(tablePath), ignoreIfNotExists);
+            if (logStoreAutoRegister && table != null) {
+                unRegisterLogSystem(identifier, table.options(), classLoader);
+            }
         } catch (Catalog.TableNotExistException e) {
             throw new TableNotExistException(getName(), tablePath);
         }
@@ -235,21 +254,30 @@ public class FlinkCatalog extends AbstractCatalog {
                             + " You can create TEMPORARY table instead if you 
want to create the table of other connector.");
         }
 
+        Identifier identifier = toIdentifier(tablePath);
+        if (logStoreAutoRegister) {
+            registerLogSystem(catalog, identifier, options, classLoader);
+        }
         // remove table path
         String specific = options.remove(PATH.key());
-        if (specific != null) {
+        if (specific != null || logStoreAutoRegister) {
             catalogTable = catalogTable.copy(options);
         }
 
+        boolean unRegisterLogSystem = false;
         try {
             catalog.createTable(
-                    toIdentifier(tablePath),
-                    FlinkCatalog.fromCatalogTable(catalogTable),
-                    ignoreIfExists);
+                    identifier, FlinkCatalog.fromCatalogTable(catalogTable), 
ignoreIfExists);
         } catch (Catalog.TableAlreadyExistException e) {
+            unRegisterLogSystem = true;
             throw new TableAlreadyExistException(getName(), tablePath);
         } catch (Catalog.DatabaseNotExistException e) {
+            unRegisterLogSystem = true;
             throw new DatabaseNotExistException(getName(), e.database());
+        } finally {
+            if (logStoreAutoRegister && unRegisterLogSystem) {
+                unRegisterLogSystem(identifier, options, classLoader);
+            }
         }
     }
 
@@ -611,7 +639,7 @@ public class FlinkCatalog extends AbstractCatalog {
         // watermark
         List<WatermarkSpec> watermarkSpecs = schema.getWatermarkSpecs();
         if (!watermarkSpecs.isEmpty()) {
-            Preconditions.checkArgument(watermarkSpecs.size() == 1);
+            checkArgument(watermarkSpecs.size() == 1);
             
columnOptions.putAll(serializeWatermarkSpec(watermarkSpecs.get(0)));
         }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogFactory.java
index abc8b10fe..ea42075c2 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogFactory.java
@@ -21,23 +21,19 @@ package org.apache.paimon.flink;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
-import org.apache.paimon.options.ConfigOption;
-import org.apache.paimon.options.ConfigOptions;
 import org.apache.paimon.options.Options;
 
 import java.util.Collections;
 import java.util.Set;
 
+import static org.apache.paimon.flink.FlinkCatalogOptions.DEFAULT_DATABASE;
+import static 
org.apache.paimon.flink.FlinkCatalogOptions.LOG_SYSTEM_AUTO_REGISTER;
+
 /** Factory for {@link FlinkCatalog}. */
 public class FlinkCatalogFactory implements 
org.apache.flink.table.factories.CatalogFactory {
 
     public static final String IDENTIFIER = "paimon";
 
-    public static final ConfigOption<String> DEFAULT_DATABASE =
-            ConfigOptions.key("default-database")
-                    .stringType()
-                    .defaultValue(Catalog.DEFAULT_DATABASE);
-
     @Override
     public String factoryIdentifier() {
         return IDENTIFIER;
@@ -67,11 +63,18 @@ public class FlinkCatalogFactory implements 
org.apache.flink.table.factories.Cat
         return new FlinkCatalog(
                 CatalogFactory.createCatalog(context, classLoader),
                 catalogName,
-                context.options().get(DEFAULT_DATABASE));
+                context.options().get(DEFAULT_DATABASE),
+                classLoader,
+                context.options().get(LOG_SYSTEM_AUTO_REGISTER));
     }
 
-    public static FlinkCatalog createCatalog(String catalogName, Catalog 
catalog) {
-        return new FlinkCatalog(catalog, catalogName, 
Catalog.DEFAULT_DATABASE);
+    public static FlinkCatalog createCatalog(String catalogName, Catalog 
catalog, Options options) {
+        return new FlinkCatalog(
+                catalog,
+                catalogName,
+                Catalog.DEFAULT_DATABASE,
+                FlinkCatalogFactory.class.getClassLoader(),
+                options.get(LOG_SYSTEM_AUTO_REGISTER));
     }
 
     public static Catalog createPaimonCatalog(Options catalogOptions) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogOptions.java
new file mode 100644
index 000000000..fb0d1b692
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogOptions.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+
+/** Options for flink catalog. */
+public class FlinkCatalogOptions {
+
+    public static final ConfigOption<String> DEFAULT_DATABASE =
+            ConfigOptions.key("default-database")
+                    .stringType()
+                    .defaultValue(Catalog.DEFAULT_DATABASE);
+
+    public static final ConfigOption<Boolean> LOG_SYSTEM_AUTO_REGISTER =
+            ConfigOptions.key("log.system.auto-register")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "If true, the register will automatically create 
and delete a topic in log system for Paimon table. Default kafka log store 
register "
+                                    + "is supported, users can implement 
customized register for log system, for example, create a new class which 
extends "
+                                    + "KafkaLogStoreFactory and return a 
customized LogStoreRegister for their kafka cluster to create/delete topics.");
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
index 0e4eaa654..6242bec27 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
@@ -46,7 +46,7 @@ public abstract class ActionBase implements Action {
         catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse);
 
         catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
-        flinkCatalog = FlinkCatalogFactory.createCatalog(catalogName, catalog);
+        flinkCatalog = FlinkCatalogFactory.createCatalog(catalogName, catalog, 
catalogOptions);
     }
 
     protected void execute(StreamExecutionEnvironment env, String defaultName) 
throws Exception {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
index 9e2ca648d..2a5038f5c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.kafka;
 
 import org.apache.paimon.CoreOptions;
 import 
org.apache.paimon.flink.factories.FlinkFactoryUtil.FlinkTableFactoryHelper;
+import org.apache.paimon.flink.log.LogStoreRegister;
 import org.apache.paimon.flink.log.LogStoreTableFactory;
 import org.apache.paimon.options.Options;
 
@@ -129,6 +130,11 @@ public class KafkaLogStoreFactory implements 
LogStoreTableFactory {
                 options.get(BUCKET));
     }
 
+    @Override
+    public LogStoreRegister createRegister(RegisterContext context) {
+        throw new UnsupportedOperationException();
+    }
+
     private int[] getPrimaryKeyIndexes(ResolvedSchema schema) {
         final List<String> columns = schema.getColumnNames();
         return schema.getPrimaryKey()
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreRegister.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreRegister.java
new file mode 100644
index 000000000..b730d289b
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreRegister.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.log;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.factories.FactoryUtil;
+import org.apache.paimon.options.Options;
+
+import java.util.Map;
+
+import static org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM;
+import static org.apache.paimon.flink.FlinkConnectorOptions.NONE;
+
+/**
+ * {@link LogStoreRegister} will register and unregister topic for a Paimon 
table, you can implement
+ * it for customized log system management.
+ */
+public interface LogStoreRegister {
+    /** Register topic in log system for the table. */
+    Map<String, String> registerTopic();
+
+    /** Unregister topic in log system for the table. */
+    void unRegisterTopic();
+
+    static void registerLogSystem(
+            Catalog catalog,
+            Identifier identifier,
+            Map<String, String> options,
+            ClassLoader classLoader) {
+        Options tableOptions = Options.fromMap(options);
+        String logStore = tableOptions.get(LOG_SYSTEM);
+        if (!tableOptions.get(LOG_SYSTEM).equalsIgnoreCase(NONE)
+                && !catalog.tableExists(identifier)) {
+            LogStoreRegister logStoreRegister =
+                    getLogStoreRegister(identifier, classLoader, tableOptions, 
logStore);
+            options.putAll(logStoreRegister.registerTopic());
+        }
+    }
+
+    static void unRegisterLogSystem(
+            Identifier identifier, Map<String, String> options, ClassLoader 
classLoader) {
+        Options tableOptions = Options.fromMap(options);
+        String logStore = tableOptions.get(LOG_SYSTEM);
+        if (!tableOptions.get(LOG_SYSTEM).equalsIgnoreCase(NONE)) {
+            LogStoreRegister logStoreRegister =
+                    getLogStoreRegister(identifier, classLoader, tableOptions, 
logStore);
+            logStoreRegister.unRegisterTopic();
+        }
+    }
+
+    static LogStoreRegister getLogStoreRegister(
+            Identifier identifier, ClassLoader classLoader, Options 
tableOptions, String logStore) {
+        LogStoreTableFactory registerFactory =
+                FactoryUtil.discoverFactory(classLoader, 
LogStoreTableFactory.class, logStore);
+        return registerFactory.createRegister(
+                new LogStoreTableFactory.RegisterContext() {
+                    @Override
+                    public Options getOptions() {
+                        return tableOptions;
+                    }
+
+                    @Override
+                    public Identifier getIdentifier() {
+                        return identifier;
+                    }
+                });
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreTableFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreTableFactory.java
index c07eda125..324b6de2f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreTableFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreTableFactory.java
@@ -18,9 +18,11 @@
 
 package org.apache.paimon.flink.log;
 
+import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.factories.Factory;
 import org.apache.paimon.factories.FactoryUtil;
 import 
org.apache.paimon.flink.factories.FlinkFactoryUtil.FlinkTableFactoryHelper;
+import org.apache.paimon.options.Options;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.serialization.SerializationSchema;
@@ -69,6 +71,21 @@ public interface LogStoreTableFactory extends Factory {
      */
     LogSinkProvider createSinkProvider(Context context, 
DynamicTableSink.Context sinkContext);
 
+    /**
+     * Creates a {@link LogStoreRegister} instance for table ddl, it will 
register table to log
+     * store when a table is created or dropped.
+     */
+    LogStoreRegister createRegister(RegisterContext context);
+
+    /** Context to create log store register. */
+    interface RegisterContext {
+        /** Options for the table. */
+        Options getOptions();
+
+        /** Identifier for the table. */
+        Identifier getIdentifier();
+    }
+
     // 
--------------------------------------------------------------------------------------------
 
     static ConfigOption<String> logKeyFormat() {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
index c182bc5d8..b5dda56ae 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
@@ -21,6 +21,11 @@ package org.apache.paimon.flink;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.AbstractCatalog;
 import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.log.LogSinkProvider;
+import org.apache.paimon.flink.log.LogSourceProvider;
+import org.apache.paimon.flink.log.LogStoreRegister;
+import org.apache.paimon.flink.log.LogStoreTableFactory;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.options.Options;
 
@@ -43,6 +48,10 @@ import 
org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -61,6 +70,8 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.stream.Stream;
 
+import static 
org.apache.paimon.flink.FlinkCatalogOptions.LOG_SYSTEM_AUTO_REGISTER;
+import static org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -70,6 +81,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Test for {@link FlinkCatalog}. */
 public class FlinkCatalogTest {
+    private static final String TESTING_LOG_STORE = "testing";
 
     private final ObjectPath path1 = new ObjectPath("db1", "t1");
     private final ObjectPath path3 = new ObjectPath("db1", "t2");
@@ -84,6 +96,7 @@ public class FlinkCatalogTest {
         String path = new File(temporaryFolder.toFile(), 
UUID.randomUUID().toString()).toString();
         Options conf = new Options();
         conf.setString("warehouse", path);
+        conf.set(LOG_SYSTEM_AUTO_REGISTER, true);
         catalog =
                 FlinkCatalogFactory.createCatalog(
                         "test-catalog",
@@ -468,6 +481,35 @@ public class FlinkCatalogTest {
         assertThat(catalogTable.getOptions()).isEqualTo(expected);
     }
 
+    @Test
+    public void testCreateTableWithLogSystemRegister() throws Exception {
+        catalog.createDatabase(path1.getDatabaseName(), null, false);
+
+        TableSchema schema =
+                TableSchema.builder()
+                        .field("pk", DataTypes.INT().notNull())
+                        .field("test", DataTypes.INT())
+                        .field("comp", DataTypes.INT(), "test + 1")
+                        .primaryKey("pk")
+                        .build();
+        Map<String, String> options = new HashMap<>();
+        CatalogTable catalogTable1 = new CatalogTableImpl(schema, options, "");
+        catalog.createTable(path1, catalogTable1, false);
+        CatalogBaseTable storedTable1 = catalog.getTable(path1);
+        
assertFalse(storedTable1.getOptions().containsKey("testing.log.store.topic"));
+
+        options.put(LOG_SYSTEM.key(), TESTING_LOG_STORE);
+        CatalogTable catalogTable2 = new CatalogTableImpl(schema, options, "");
+        catalog.createTable(path3, catalogTable2, false);
+
+        CatalogBaseTable storedTable2 = catalog.getTable(path3);
+        assertEquals(
+                String.format("%s-topic", path3.getObjectName()),
+                storedTable2.getOptions().get("testing.log.store.topic"));
+        assertThatThrownBy(() -> catalog.dropTable(path3, true))
+                .hasMessage("Check unregister log store topic here.");
+    }
+
     private void checkEquals(ObjectPath path, CatalogTable t1, CatalogTable 
t2) {
         Path tablePath =
                 ((AbstractCatalog) ((FlinkCatalog) catalog).catalog())
@@ -517,4 +559,52 @@ public class FlinkCatalogTest {
         }
         return allOptions.stream();
     }
+
+    /** Testing log store register factory to create {@link 
TestingLogStoreRegister}. */
+    public static class TestingLogSoreRegisterFactory implements 
LogStoreTableFactory {
+
+        @Override
+        public String identifier() {
+            return TESTING_LOG_STORE;
+        }
+
+        @Override
+        public LogSourceProvider createSourceProvider(
+                DynamicTableFactory.Context context,
+                DynamicTableSource.Context sourceContext,
+                @Nullable int[][] projectFields) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public LogSinkProvider createSinkProvider(
+                DynamicTableFactory.Context context, DynamicTableSink.Context 
sinkContext) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public LogStoreRegister createRegister(RegisterContext context) {
+            return new TestingLogStoreRegister(context.getIdentifier());
+        }
+    }
+
+    /** Testing log store register. */
+    private static class TestingLogStoreRegister implements LogStoreRegister {
+        private final Identifier table;
+
+        private TestingLogStoreRegister(Identifier table) {
+            this.table = table;
+        }
+
+        @Override
+        public Map<String, String> registerTopic() {
+            return Collections.singletonMap(
+                    "testing.log.store.topic", String.format("%s-topic", 
table.getObjectName()));
+        }
+
+        @Override
+        public void unRegisterTopic() {
+            throw new UnsupportedOperationException("Check unregister log 
store topic here.");
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
index faa8cf673..6b51f027e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -15,3 +15,5 @@
 
 org.apache.paimon.flink.action.cdc.mysql.TestCaseInsensitiveCatalogFactory
 org.apache.paimon.flink.action.cdc.mysql.TestAlterTableCatalogFactory
+
+org.apache.paimon.flink.FlinkCatalogTest$TestingLogSoreRegisterFactory
\ No newline at end of file

Reply via email to