This is an automated email from the ASF dual-hosted git repository.
zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 63927fa10 [flink][logsystem] Introduce log store register interfaces
(#1548)
63927fa10 is described below
commit 63927fa1051a9f4e4aade79e921489e7dd280ede
Author: Shammon FY <[email protected]>
AuthorDate: Tue Jul 25 11:47:51 2023 +0800
[flink][logsystem] Introduce log store register interfaces (#1548)
* [flink][logsystem] Introduce log store register interfaces
* [flink][logsystem] Lazy check table exists
* [flink][logsystem] Add auto-register option and merge log store register
in LogStoreTableFactory
* [flink][logsystem] fix docs
* [flink][logsystem] Update register interface for options
* [flink][logsystem] Add auto register config in flink catalog options
* [flink][logsystem] Rebase from master
* [flink][logsystem] Add flink catalog options in configurations.md
---
docs/content/maintenance/configurations.md | 6 ++
.../generated/flink_catalog_configuration.html | 42 ++++++++++
.../java/org/apache/paimon/flink/FlinkCatalog.java | 44 +++++++++--
.../apache/paimon/flink/FlinkCatalogFactory.java | 23 +++---
.../apache/paimon/flink/FlinkCatalogOptions.java | 41 ++++++++++
.../org/apache/paimon/flink/action/ActionBase.java | 2 +-
.../paimon/flink/kafka/KafkaLogStoreFactory.java | 6 ++
.../apache/paimon/flink/log/LogStoreRegister.java | 85 ++++++++++++++++++++
.../paimon/flink/log/LogStoreTableFactory.java | 17 ++++
.../org/apache/paimon/flink/FlinkCatalogTest.java | 90 ++++++++++++++++++++++
.../services/org.apache.paimon.factories.Factory | 2 +
11 files changed, 339 insertions(+), 19 deletions(-)
diff --git a/docs/content/maintenance/configurations.md
b/docs/content/maintenance/configurations.md
index e34a33260..5d46dbfaa 100644
--- a/docs/content/maintenance/configurations.md
+++ b/docs/content/maintenance/configurations.md
@@ -44,6 +44,12 @@ Options for Hive catalog.
{{< generated/hive_catalog_configuration >}}
+### FlinkCatalogOptions
+
+Flink catalog options for paimon.
+
+{{< generated/flink_catalog_configuration >}}
+
### FlinkConnectorOptions
Flink connector options for paimon.
diff --git a/docs/layouts/shortcodes/generated/flink_catalog_configuration.html
b/docs/layouts/shortcodes/generated/flink_catalog_configuration.html
new file mode 100644
index 000000000..c1ed6dbb3
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/flink_catalog_configuration.html
@@ -0,0 +1,42 @@
+{{/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/}}
+<table class="configuration table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Key</th>
+ <th class="text-left" style="width: 15%">Default</th>
+ <th class="text-left" style="width: 10%">Type</th>
+ <th class="text-left" style="width: 55%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>default-database</h5></td>
+ <td style="word-wrap: break-word;">"default"</td>
+ <td>String</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td><h5>log.system.auto-register</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>If true, the register will automatically create and delete a
topic in log system for Paimon table. Default kafka log store register is
supported, users can implement customized register for log system, for example,
create a new class which extends KafkaLogStoreFactory and return a customized
LogStoreRegister for their kafka cluster to create/delete topics.</td>
+ </tr>
+ </tbody>
+</table>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index a629112c1..8a2224da0 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -25,7 +25,6 @@ import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
-import org.apache.paimon.utils.Preconditions;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableSchema;
@@ -91,21 +90,33 @@ import static
org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDa
import static org.apache.paimon.CoreOptions.PATH;
import static org.apache.paimon.flink.LogicalTypeConversion.toDataType;
import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
+import static org.apache.paimon.flink.log.LogStoreRegister.registerLogSystem;
+import static org.apache.paimon.flink.log.LogStoreRegister.unRegisterLogSystem;
import static
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.compoundKey;
import static
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn;
import static
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeWatermarkSpec;
import static
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.nonPhysicalColumnsCount;
import static
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.serializeNonPhysicalColumns;
import static
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.serializeWatermarkSpec;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Catalog for paimon. */
public class FlinkCatalog extends AbstractCatalog {
+ private final ClassLoader classLoader;
private final Catalog catalog;
-
- public FlinkCatalog(Catalog catalog, String name, String defaultDatabase) {
+ private final boolean logStoreAutoRegister;
+
+ public FlinkCatalog(
+ Catalog catalog,
+ String name,
+ String defaultDatabase,
+ ClassLoader classLoader,
+ boolean logStoreAutoRegister) {
super(name, defaultDatabase);
this.catalog = catalog;
+ this.classLoader = classLoader;
+ this.logStoreAutoRegister = logStoreAutoRegister;
try {
this.catalog.createDatabase(defaultDatabase, true);
} catch (Catalog.DatabaseAlreadyExistException ignore) {
@@ -210,8 +221,16 @@ public class FlinkCatalog extends AbstractCatalog {
@Override
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
+ Identifier identifier = toIdentifier(tablePath);
+ Table table = null;
try {
+ if (logStoreAutoRegister && catalog.tableExists(identifier)) {
+ table = catalog.getTable(identifier);
+ }
catalog.dropTable(toIdentifier(tablePath), ignoreIfNotExists);
+ if (logStoreAutoRegister && table != null) {
+ unRegisterLogSystem(identifier, table.options(), classLoader);
+ }
} catch (Catalog.TableNotExistException e) {
throw new TableNotExistException(getName(), tablePath);
}
@@ -235,21 +254,30 @@ public class FlinkCatalog extends AbstractCatalog {
+ " You can create TEMPORARY table instead if you
want to create the table of other connector.");
}
+ Identifier identifier = toIdentifier(tablePath);
+ if (logStoreAutoRegister) {
+ registerLogSystem(catalog, identifier, options, classLoader);
+ }
// remove table path
String specific = options.remove(PATH.key());
- if (specific != null) {
+ if (specific != null || logStoreAutoRegister) {
catalogTable = catalogTable.copy(options);
}
+ boolean unRegisterLogSystem = false;
try {
catalog.createTable(
- toIdentifier(tablePath),
- FlinkCatalog.fromCatalogTable(catalogTable),
- ignoreIfExists);
+ identifier, FlinkCatalog.fromCatalogTable(catalogTable),
ignoreIfExists);
} catch (Catalog.TableAlreadyExistException e) {
+ unRegisterLogSystem = true;
throw new TableAlreadyExistException(getName(), tablePath);
} catch (Catalog.DatabaseNotExistException e) {
+ unRegisterLogSystem = true;
throw new DatabaseNotExistException(getName(), e.database());
+ } finally {
+ if (logStoreAutoRegister && unRegisterLogSystem) {
+ unRegisterLogSystem(identifier, options, classLoader);
+ }
}
}
@@ -611,7 +639,7 @@ public class FlinkCatalog extends AbstractCatalog {
// watermark
List<WatermarkSpec> watermarkSpecs = schema.getWatermarkSpecs();
if (!watermarkSpecs.isEmpty()) {
- Preconditions.checkArgument(watermarkSpecs.size() == 1);
+ checkArgument(watermarkSpecs.size() == 1);
columnOptions.putAll(serializeWatermarkSpec(watermarkSpecs.get(0)));
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogFactory.java
index abc8b10fe..ea42075c2 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogFactory.java
@@ -21,23 +21,19 @@ package org.apache.paimon.flink;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
-import org.apache.paimon.options.ConfigOption;
-import org.apache.paimon.options.ConfigOptions;
import org.apache.paimon.options.Options;
import java.util.Collections;
import java.util.Set;
+import static org.apache.paimon.flink.FlinkCatalogOptions.DEFAULT_DATABASE;
+import static
org.apache.paimon.flink.FlinkCatalogOptions.LOG_SYSTEM_AUTO_REGISTER;
+
/** Factory for {@link FlinkCatalog}. */
public class FlinkCatalogFactory implements
org.apache.flink.table.factories.CatalogFactory {
public static final String IDENTIFIER = "paimon";
- public static final ConfigOption<String> DEFAULT_DATABASE =
- ConfigOptions.key("default-database")
- .stringType()
- .defaultValue(Catalog.DEFAULT_DATABASE);
-
@Override
public String factoryIdentifier() {
return IDENTIFIER;
@@ -67,11 +63,18 @@ public class FlinkCatalogFactory implements
org.apache.flink.table.factories.Cat
return new FlinkCatalog(
CatalogFactory.createCatalog(context, classLoader),
catalogName,
- context.options().get(DEFAULT_DATABASE));
+ context.options().get(DEFAULT_DATABASE),
+ classLoader,
+ context.options().get(LOG_SYSTEM_AUTO_REGISTER));
}
- public static FlinkCatalog createCatalog(String catalogName, Catalog
catalog) {
- return new FlinkCatalog(catalog, catalogName,
Catalog.DEFAULT_DATABASE);
+ public static FlinkCatalog createCatalog(String catalogName, Catalog
catalog, Options options) {
+ return new FlinkCatalog(
+ catalog,
+ catalogName,
+ Catalog.DEFAULT_DATABASE,
+ FlinkCatalogFactory.class.getClassLoader(),
+ options.get(LOG_SYSTEM_AUTO_REGISTER));
}
public static Catalog createPaimonCatalog(Options catalogOptions) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogOptions.java
new file mode 100644
index 000000000..fb0d1b692
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogOptions.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+
+/** Options for flink catalog. */
+public class FlinkCatalogOptions {
+
+ public static final ConfigOption<String> DEFAULT_DATABASE =
+ ConfigOptions.key("default-database")
+ .stringType()
+ .defaultValue(Catalog.DEFAULT_DATABASE);
+
+ public static final ConfigOption<Boolean> LOG_SYSTEM_AUTO_REGISTER =
+ ConfigOptions.key("log.system.auto-register")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "If true, the register will automatically create
and delete a topic in log system for Paimon table. Default kafka log store
register "
+ + "is supported, users can implement
customized register for log system, for example, create a new class which
extends "
+ + "KafkaLogStoreFactory and return a
customized LogStoreRegister for their kafka cluster to create/delete topics.");
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
index 0e4eaa654..6242bec27 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
@@ -46,7 +46,7 @@ public abstract class ActionBase implements Action {
catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse);
catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
- flinkCatalog = FlinkCatalogFactory.createCatalog(catalogName, catalog);
+ flinkCatalog = FlinkCatalogFactory.createCatalog(catalogName, catalog,
catalogOptions);
}
protected void execute(StreamExecutionEnvironment env, String defaultName)
throws Exception {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
index 9e2ca648d..2a5038f5c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.kafka;
import org.apache.paimon.CoreOptions;
import
org.apache.paimon.flink.factories.FlinkFactoryUtil.FlinkTableFactoryHelper;
+import org.apache.paimon.flink.log.LogStoreRegister;
import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.options.Options;
@@ -129,6 +130,11 @@ public class KafkaLogStoreFactory implements
LogStoreTableFactory {
options.get(BUCKET));
}
+ @Override
+ public LogStoreRegister createRegister(RegisterContext context) {
+ throw new UnsupportedOperationException();
+ }
+
private int[] getPrimaryKeyIndexes(ResolvedSchema schema) {
final List<String> columns = schema.getColumnNames();
return schema.getPrimaryKey()
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreRegister.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreRegister.java
new file mode 100644
index 000000000..b730d289b
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreRegister.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.log;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.factories.FactoryUtil;
+import org.apache.paimon.options.Options;
+
+import java.util.Map;
+
+import static org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM;
+import static org.apache.paimon.flink.FlinkConnectorOptions.NONE;
+
+/**
+ * {@link LogStoreRegister} will register and unregister topic for a Paimon
table, you can implement
+ * it for customized log system management.
+ */
+public interface LogStoreRegister {
+ /** Register topic in log system for the table. */
+ Map<String, String> registerTopic();
+
+ /** Unregister topic in log system for the table. */
+ void unRegisterTopic();
+
+ static void registerLogSystem(
+ Catalog catalog,
+ Identifier identifier,
+ Map<String, String> options,
+ ClassLoader classLoader) {
+ Options tableOptions = Options.fromMap(options);
+ String logStore = tableOptions.get(LOG_SYSTEM);
+ if (!tableOptions.get(LOG_SYSTEM).equalsIgnoreCase(NONE)
+ && !catalog.tableExists(identifier)) {
+ LogStoreRegister logStoreRegister =
+ getLogStoreRegister(identifier, classLoader, tableOptions,
logStore);
+ options.putAll(logStoreRegister.registerTopic());
+ }
+ }
+
+ static void unRegisterLogSystem(
+ Identifier identifier, Map<String, String> options, ClassLoader
classLoader) {
+ Options tableOptions = Options.fromMap(options);
+ String logStore = tableOptions.get(LOG_SYSTEM);
+ if (!tableOptions.get(LOG_SYSTEM).equalsIgnoreCase(NONE)) {
+ LogStoreRegister logStoreRegister =
+ getLogStoreRegister(identifier, classLoader, tableOptions,
logStore);
+ logStoreRegister.unRegisterTopic();
+ }
+ }
+
+ static LogStoreRegister getLogStoreRegister(
+ Identifier identifier, ClassLoader classLoader, Options
tableOptions, String logStore) {
+ LogStoreTableFactory registerFactory =
+ FactoryUtil.discoverFactory(classLoader,
LogStoreTableFactory.class, logStore);
+ return registerFactory.createRegister(
+ new LogStoreTableFactory.RegisterContext() {
+ @Override
+ public Options getOptions() {
+ return tableOptions;
+ }
+
+ @Override
+ public Identifier getIdentifier() {
+ return identifier;
+ }
+ });
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreTableFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreTableFactory.java
index c07eda125..324b6de2f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreTableFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreTableFactory.java
@@ -18,9 +18,11 @@
package org.apache.paimon.flink.log;
+import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.factories.Factory;
import org.apache.paimon.factories.FactoryUtil;
import
org.apache.paimon.flink.factories.FlinkFactoryUtil.FlinkTableFactoryHelper;
+import org.apache.paimon.options.Options;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
@@ -69,6 +71,21 @@ public interface LogStoreTableFactory extends Factory {
*/
LogSinkProvider createSinkProvider(Context context,
DynamicTableSink.Context sinkContext);
+ /**
+ * Creates a {@link LogStoreRegister} instance for table ddl, it will
register table to log
+ * store when a table is created or dropped.
+ */
+ LogStoreRegister createRegister(RegisterContext context);
+
+ /** Context to create log store register. */
+ interface RegisterContext {
+ /** Options for the table. */
+ Options getOptions();
+
+ /** Identifier for the table. */
+ Identifier getIdentifier();
+ }
+
//
--------------------------------------------------------------------------------------------
static ConfigOption<String> logKeyFormat() {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
index c182bc5d8..b5dda56ae 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
@@ -21,6 +21,11 @@ package org.apache.paimon.flink;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.log.LogSinkProvider;
+import org.apache.paimon.flink.log.LogSourceProvider;
+import org.apache.paimon.flink.log.LogStoreRegister;
+import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
@@ -43,6 +48,10 @@ import
org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -61,6 +70,8 @@ import java.util.Map;
import java.util.UUID;
import java.util.stream.Stream;
+import static
org.apache.paimon.flink.FlinkCatalogOptions.LOG_SYSTEM_AUTO_REGISTER;
+import static org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -70,6 +81,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
/** Test for {@link FlinkCatalog}. */
public class FlinkCatalogTest {
+ private static final String TESTING_LOG_STORE = "testing";
private final ObjectPath path1 = new ObjectPath("db1", "t1");
private final ObjectPath path3 = new ObjectPath("db1", "t2");
@@ -84,6 +96,7 @@ public class FlinkCatalogTest {
String path = new File(temporaryFolder.toFile(),
UUID.randomUUID().toString()).toString();
Options conf = new Options();
conf.setString("warehouse", path);
+ conf.set(LOG_SYSTEM_AUTO_REGISTER, true);
catalog =
FlinkCatalogFactory.createCatalog(
"test-catalog",
@@ -468,6 +481,35 @@ public class FlinkCatalogTest {
assertThat(catalogTable.getOptions()).isEqualTo(expected);
}
+ @Test
+ public void testCreateTableWithLogSystemRegister() throws Exception {
+ catalog.createDatabase(path1.getDatabaseName(), null, false);
+
+ TableSchema schema =
+ TableSchema.builder()
+ .field("pk", DataTypes.INT().notNull())
+ .field("test", DataTypes.INT())
+ .field("comp", DataTypes.INT(), "test + 1")
+ .primaryKey("pk")
+ .build();
+ Map<String, String> options = new HashMap<>();
+ CatalogTable catalogTable1 = new CatalogTableImpl(schema, options, "");
+ catalog.createTable(path1, catalogTable1, false);
+ CatalogBaseTable storedTable1 = catalog.getTable(path1);
+
assertFalse(storedTable1.getOptions().containsKey("testing.log.store.topic"));
+
+ options.put(LOG_SYSTEM.key(), TESTING_LOG_STORE);
+ CatalogTable catalogTable2 = new CatalogTableImpl(schema, options, "");
+ catalog.createTable(path3, catalogTable2, false);
+
+ CatalogBaseTable storedTable2 = catalog.getTable(path3);
+ assertEquals(
+ String.format("%s-topic", path3.getObjectName()),
+ storedTable2.getOptions().get("testing.log.store.topic"));
+ assertThatThrownBy(() -> catalog.dropTable(path3, true))
+ .hasMessage("Check unregister log store topic here.");
+ }
+
private void checkEquals(ObjectPath path, CatalogTable t1, CatalogTable
t2) {
Path tablePath =
((AbstractCatalog) ((FlinkCatalog) catalog).catalog())
@@ -517,4 +559,52 @@ public class FlinkCatalogTest {
}
return allOptions.stream();
}
+
+ /** Testing log store register factory to create {@link
TestingLogStoreRegister}. */
+ public static class TestingLogSoreRegisterFactory implements
LogStoreTableFactory {
+
+ @Override
+ public String identifier() {
+ return TESTING_LOG_STORE;
+ }
+
+ @Override
+ public LogSourceProvider createSourceProvider(
+ DynamicTableFactory.Context context,
+ DynamicTableSource.Context sourceContext,
+ @Nullable int[][] projectFields) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public LogSinkProvider createSinkProvider(
+ DynamicTableFactory.Context context, DynamicTableSink.Context
sinkContext) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public LogStoreRegister createRegister(RegisterContext context) {
+ return new TestingLogStoreRegister(context.getIdentifier());
+ }
+ }
+
+ /** Testing log store register. */
+ private static class TestingLogStoreRegister implements LogStoreRegister {
+ private final Identifier table;
+
+ private TestingLogStoreRegister(Identifier table) {
+ this.table = table;
+ }
+
+ @Override
+ public Map<String, String> registerTopic() {
+ return Collections.singletonMap(
+ "testing.log.store.topic", String.format("%s-topic",
table.getObjectName()));
+ }
+
+ @Override
+ public void unRegisterTopic() {
+ throw new UnsupportedOperationException("Check unregister log
store topic here.");
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
index faa8cf673..6b51f027e 100644
---
a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -15,3 +15,5 @@
org.apache.paimon.flink.action.cdc.mysql.TestCaseInsensitiveCatalogFactory
org.apache.paimon.flink.action.cdc.mysql.TestAlterTableCatalogFactory
+
+org.apache.paimon.flink.FlinkCatalogTest$TestingLogSoreRegisterFactory
\ No newline at end of file