This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new af4be76 [FLINK-26208] Introduce TableStoreFactory to support
create/drop managed table via SQL
af4be76 is described below
commit af4be7675833cf000c1f9b1891bba0a106b27d4a
Author: Jane Chan <[email protected]>
AuthorDate: Tue Mar 1 17:47:54 2022 +0800
[FLINK-26208] Introduce TableStoreFactory to support create/drop managed
table via SQL
This closes #25
---
flink-table-store-connector/pom.xml | 72 ++-
.../table/store/connector/TableStoreFactory.java | 191 +++++++
.../store/connector/TableStoreFactoryOptions.java | 39 ++
.../org.apache.flink.table.factories.Factory | 16 +
.../store/connector/TableStoreFactoryTest.java | 282 ++++++++++
.../table/store/connector/TableStoreITCase.java | 576 +++++++++++++++++++++
.../flink/table/store/file/FileStoreOptions.java | 18 +
.../apache/flink/table/store/log/LogOptions.java | 2 +
.../table/store/log/LogStoreTableFactory.java | 4 +-
flink-table-store-kafka/pom.xml | 18 +-
.../table/store/kafka/KafkaLogStoreFactory.java | 40 +-
.../flink/table/store/kafka/KafkaLogITCase.java | 6 +-
.../table/store/kafka/KafkaTableTestBase.java | 43 ++
13 files changed, 1294 insertions(+), 13 deletions(-)
diff --git a/flink-table-store-connector/pom.xml
b/flink-table-store-connector/pom.xml
index f05cd72..0b3e178 100644
--- a/flink-table-store-connector/pom.xml
+++ b/flink-table-store-connector/pom.xml
@@ -96,6 +96,34 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-store-kafka</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-store-kafka</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-connector-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
@@ -141,6 +169,48 @@ under the License.
</exclusions>
<scope>test</scope>
</dependency>
+ <dependency>
+ <!-- include 2.0 server for tests -->
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_${scala.binary.version}</artifactId>
+ <version>2.4.1</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.module</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.datatype</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>kafka</artifactId>
+ <version>${testcontainers.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<build>
@@ -158,4 +228,4 @@ under the License.
</plugin>
</plugins>
</build>
-</project>
\ No newline at end of file
+</project>
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
new file mode 100644
index 0000000..6b7e5a2
--- /dev/null
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.ManagedTableFactory;
+import org.apache.flink.table.store.file.FileStoreOptions;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static
org.apache.flink.table.store.connector.TableStoreFactoryOptions.CHANGE_TRACKING;
+import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
+import static org.apache.flink.table.store.file.FileStoreOptions.FILE_PATH;
+import static
org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
+import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
+import static
org.apache.flink.table.store.log.LogStoreTableFactory.discoverLogStoreFactory;
+
+/** Default implementation of {@link ManagedTableFactory}. */
+public class TableStoreFactory implements ManagedTableFactory {
+
+ @Override
+ public Map<String, String> enrichOptions(Context context) {
+ Map<String, String> enrichedOptions = new
HashMap<>(context.getCatalogTable().getOptions());
+ ((Configuration) context.getConfiguration())
+ .toMap()
+ .forEach(
+ (k, v) -> {
+ if (k.startsWith(TABLE_STORE_PREFIX)) {
+ enrichedOptions.putIfAbsent(
+
k.substring(TABLE_STORE_PREFIX.length()), v);
+ }
+ });
+ return enrichedOptions;
+ }
+
+ @Override
+ public void onCreateTable(Context context, boolean ignoreIfExists) {
+ Map<String, String> enrichedOptions =
context.getCatalogTable().getOptions();
+ Path path = tablePath(enrichedOptions, context.getObjectIdentifier());
+ try {
+ if (path.getFileSystem().exists(path) && !ignoreIfExists) {
+ throw new TableException(
+ String.format(
+ "Failed to create file store path. "
+ + "Reason: directory %s exists for
table %s. "
+ + "Suggestion: please try `DESCRIBE
TABLE %s` to "
+ + "first check whether table exists in
current catalog. "
+ + "If table exists in catalog, and
data files under current path "
+ + "are valid, please use `CREATE TABLE
IF NOT EXISTS` ddl instead. "
+ + "Otherwise, please choose another
table name "
+ + "or manually delete the current path
and try again.",
+ path,
+
context.getObjectIdentifier().asSerializableString(),
+
context.getObjectIdentifier().asSerializableString()));
+ }
+ path.getFileSystem().mkdirs(path);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
+ if (enableChangeTracking(enrichedOptions)) {
+ Context logStoreContext =
+ new FactoryUtil.DefaultDynamicTableContext(
+ context.getObjectIdentifier(),
+
context.getCatalogTable().copy(filterLogStoreOptions(enrichedOptions)),
+
filterLogStoreOptions(context.getEnrichmentOptions()),
+ context.getConfiguration(),
+ context.getClassLoader(),
+ context.isTemporary());
+ discoverLogStoreFactory(
+ Thread.currentThread().getContextClassLoader(),
+ TableStoreFactoryOptions.LOG_SYSTEM.defaultValue())
+ .onCreateTable(
+ logStoreContext,
+ Integer.parseInt(
+ enrichedOptions.getOrDefault(
+ BUCKET.key(),
BUCKET.defaultValue().toString())),
+ ignoreIfExists);
+ }
+ }
+
+ @Override
+ public void onDropTable(Context context, boolean ignoreIfNotExists) {
+ Map<String, String> enrichedOptions =
context.getCatalogTable().getOptions();
+ Path path = tablePath(enrichedOptions, context.getObjectIdentifier());
+ try {
+ if (path.getFileSystem().exists(path)) {
+ path.getFileSystem().delete(path, true);
+ } else if (!ignoreIfNotExists) {
+ throw new TableException(
+ String.format(
+ "Failed to delete file store path. "
+ + "Reason: directory %s doesn't exist
for table %s. "
+ + "Suggestion: please try `DROP TABLE
IF EXISTS` ddl instead.",
+ path,
context.getObjectIdentifier().asSerializableString()));
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ if (enableChangeTracking(enrichedOptions)) {
+ Context logStoreContext =
+ new FactoryUtil.DefaultDynamicTableContext(
+ context.getObjectIdentifier(),
+
context.getCatalogTable().copy(filterLogStoreOptions(enrichedOptions)),
+ context.getEnrichmentOptions(),
+ context.getConfiguration(),
+ context.getClassLoader(),
+ context.isTemporary());
+ discoverLogStoreFactory(
+ Thread.currentThread().getContextClassLoader(),
+ TableStoreFactoryOptions.LOG_SYSTEM.defaultValue())
+ .onDropTable(logStoreContext, ignoreIfNotExists);
+ }
+ }
+
+ @Override
+ public Map<String, String> onCompactTable(
+ Context context, CatalogPartitionSpec catalogPartitionSpec) {
+ throw new UnsupportedOperationException("Not implement yet");
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ Set<ConfigOption<?>> options = FileStoreOptions.allOptions();
+ options.add(CHANGE_TRACKING);
+ return options;
+ }
+
+ // ~ Tools
------------------------------------------------------------------
+
+ @VisibleForTesting
+ Map<String, String> filterLogStoreOptions(Map<String, String>
enrichedOptions) {
+ Map<String, String> logStoreOptions = new HashMap<>();
+ enrichedOptions.forEach(
+ (k, v) -> {
+ if (k.startsWith(LOG_PREFIX)) {
+ logStoreOptions.put(k.substring(LOG_PREFIX.length()),
v);
+ }
+ });
+ return logStoreOptions;
+ }
+
+ private static Path tablePath(Map<String, String> options,
ObjectIdentifier identifier) {
+ return new Path(
+ new Path(options.get(FILE_PATH.key())),
+ String.format(
+ "root/%s.catalog/%s.db/%s",
+ identifier.getCatalogName(),
+ identifier.getDatabaseName(),
+ identifier.getObjectName()));
+ }
+
+ private static boolean enableChangeTracking(Map<String, String> options) {
+ return Boolean.parseBoolean(
+ options.getOrDefault(
+ CHANGE_TRACKING.key(),
CHANGE_TRACKING.defaultValue().toString()));
+ }
+}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
new file mode 100644
index 0000000..21393ac
--- /dev/null
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/** Options for {@link TableStoreFactory}. */
+public class TableStoreFactoryOptions {
+
+ public static final ConfigOption<Boolean> CHANGE_TRACKING =
+ ConfigOptions.key("change-tracking")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Whether to enable change tracking. The default
value is true, which means consuming changes from the table.");
+
+ public static final ConfigOption<String> LOG_SYSTEM =
+ ConfigOptions.key("log.system")
+ .stringType()
+ .defaultValue("kafka")
+ .withDescription("The log system used to keep changes of
the table.");
+}
diff --git
a/flink-table-store-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
b/flink-table-store-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000..e377345
--- /dev/null
+++
b/flink-table-store-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.store.connector.TableStoreFactory
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreFactoryTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreFactoryTest.java
new file mode 100644
index 0000000..e0c9b5b
--- /dev/null
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreFactoryTest.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.ManagedTableFactory;
+import org.apache.flink.table.store.log.LogOptions;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+
+import static
org.apache.flink.table.store.connector.TableStoreFactoryOptions.CHANGE_TRACKING;
+import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
+import static org.apache.flink.table.store.file.FileStoreOptions.FILE_PATH;
+import static
org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
+import static
org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
+import static org.apache.flink.table.store.log.LogOptions.CONSISTENCY;
+import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test cases for {@link TableStoreFactory}. */
+public class TableStoreFactoryTest {
+
+ private static final ObjectIdentifier TABLE_IDENTIFIER =
+ ObjectIdentifier.of("catalog", "database", "table");
+
+ private final ManagedTableFactory tableStoreFactory = new
TableStoreFactory();
+
+ @TempDir private static java.nio.file.Path sharedTempDir;
+ private DynamicTableFactory.Context context;
+
+ @ParameterizedTest
+ @MethodSource("providingOptions")
+ public void testEnrichOptions(
+ Map<String, String> sessionOptions,
+ Map<String, String> tableOptions,
+ Map<String, String> expectedEnrichedOptions) {
+ context = createTableContext(sessionOptions, tableOptions);
+ Map<String, String> actualEnrichedOptions =
tableStoreFactory.enrichOptions(context);
+ assertThat(actualEnrichedOptions)
+ .containsExactlyInAnyOrderEntriesOf(expectedEnrichedOptions);
+ }
+
+ @ParameterizedTest
+ @MethodSource("providingEnrichedOptionsForCreation")
+ public void testOnCreateTable(Map<String, String> enrichedOptions, boolean
ignoreIfExists) {
+ context = createTableContext(Collections.emptyMap(), enrichedOptions);
+ Path expectedPath =
+ Paths.get(
+ sharedTempDir.toAbsolutePath().toString(),
+ String.format(
+ "root/%s.catalog/%s.db/%s",
+ TABLE_IDENTIFIER.getCatalogName(),
+ TABLE_IDENTIFIER.getDatabaseName(),
+ TABLE_IDENTIFIER.getObjectName()));
+ boolean exist = expectedPath.toFile().exists();
+ if (ignoreIfExists || !exist) {
+ tableStoreFactory.onCreateTable(context, ignoreIfExists);
+ assertThat(expectedPath).exists();
+ } else {
+ assertThatThrownBy(() -> tableStoreFactory.onCreateTable(context,
false))
+ .isInstanceOf(TableException.class)
+ .hasMessageContaining(
+ String.format(
+ "Failed to create file store path. "
+ + "Reason: directory %s exists for
table %s. "
+ + "Suggestion: please try
`DESCRIBE TABLE %s` to "
+ + "first check whether table
exists in current catalog. "
+ + "If table exists in catalog, and
data files under current path "
+ + "are valid, please use `CREATE
TABLE IF NOT EXISTS` ddl instead. "
+ + "Otherwise, please choose
another table name "
+ + "or manually delete the current
path and try again.",
+ expectedPath,
+ TABLE_IDENTIFIER.asSerializableString(),
+ TABLE_IDENTIFIER.asSerializableString()));
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("providingEnrichedOptionsForDrop")
+ public void testOnDropTable(Map<String, String> enrichedOptions, boolean
ignoreIfNotExists) {
+ context = createTableContext(Collections.emptyMap(), enrichedOptions);
+ Path expectedPath =
+ Paths.get(
+ sharedTempDir.toAbsolutePath().toString(),
+ String.format(
+ "root/%s.catalog/%s.db/%s",
+ TABLE_IDENTIFIER.getCatalogName(),
+ TABLE_IDENTIFIER.getDatabaseName(),
+ TABLE_IDENTIFIER.getObjectName()));
+ boolean exist = expectedPath.toFile().exists();
+ if (exist || ignoreIfNotExists) {
+ tableStoreFactory.onDropTable(context, ignoreIfNotExists);
+ assertThat(expectedPath).doesNotExist();
+ } else {
+ assertThatThrownBy(() -> tableStoreFactory.onDropTable(context,
false))
+ .isInstanceOf(TableException.class)
+ .hasMessageContaining(
+ String.format(
+ "Failed to delete file store path. "
+ + "Reason: directory %s doesn't
exist for table %s. "
+ + "Suggestion: please try `DROP
TABLE IF EXISTS` ddl instead.",
+ expectedPath,
TABLE_IDENTIFIER.asSerializableString()));
+ }
+ }
+
+ @Test
+ public void testFilterLogStoreOptions() {
+ // mix invalid key and leave value to empty to emphasize the deferred
validation
+ Map<String, String> expectedLogOptions =
+ of(
+ LogOptions.SCAN.key(),
+ "",
+ LogOptions.RETENTION.key(),
+ "",
+ "dummy.key",
+ "",
+ LogOptions.CHANGELOG_MODE.key(),
+ "");
+ Map<String, String> enrichedOptions =
+ addPrefix(expectedLogOptions, LOG_PREFIX, (key) -> true);
+ enrichedOptions.put("foo", "bar");
+
+ assertThat(((TableStoreFactory)
tableStoreFactory).filterLogStoreOptions(enrichedOptions))
+ .containsExactlyInAnyOrderEntriesOf(expectedLogOptions);
+ }
+
+ // ~ Tools
------------------------------------------------------------------
+
+ private static Stream<Arguments> providingOptions() {
+ Map<String, String> enrichedOptions =
+ of(
+ BUCKET.key(),
+ BUCKET.defaultValue().toString(),
+ FILE_PATH.key(),
+ sharedTempDir.toString(),
+ LOG_PREFIX + BOOTSTRAP_SERVERS.key(),
+ "localhost:9092",
+ LOG_PREFIX + CONSISTENCY.key(),
+ CONSISTENCY.defaultValue().name());
+
+ // default
+ Arguments arg0 =
+ Arguments.of(
+ Collections.emptyMap(), Collections.emptyMap(),
Collections.emptyMap());
+
+ // set configuration under session level
+ Arguments arg1 =
+ Arguments.of(
+ addPrefix(enrichedOptions, TABLE_STORE_PREFIX, (key)
-> true),
+ Collections.emptyMap(),
+ enrichedOptions);
+
+ // set configuration under table level
+ Arguments arg2 = Arguments.of(Collections.emptyMap(), enrichedOptions,
enrichedOptions);
+
+ // set both session and table level configuration to test options
combination
+ Map<String, String> tableOptions = new HashMap<>(enrichedOptions);
+ tableOptions.remove(FILE_PATH.key());
+ tableOptions.remove(CONSISTENCY.key());
+ Arguments arg3 =
+ Arguments.of(
+ addPrefix(
+ enrichedOptions,
+ TABLE_STORE_PREFIX,
+ (key) -> !tableOptions.containsKey(key)),
+ tableOptions,
+ enrichedOptions);
+
+ // set same key with different value to test table configuration take
precedence
+ Map<String, String> sessionOptions = new HashMap<>();
+ sessionOptions.put(
+ TABLE_STORE_PREFIX + BUCKET.key(),
String.valueOf(BUCKET.defaultValue() + 1));
+ Arguments arg4 = Arguments.of(sessionOptions, enrichedOptions,
enrichedOptions);
+ return Stream.of(arg0, arg1, arg2, arg3, arg4);
+ }
+
+ private static Stream<Arguments> providingEnrichedOptionsForCreation() {
+ Map<String, String> enrichedOptions = new HashMap<>();
+ enrichedOptions.put(FILE_PATH.key(),
sharedTempDir.toAbsolutePath().toString());
+ enrichedOptions.put(CHANGE_TRACKING.key(), String.valueOf(false));
+ return Stream.of(
+ Arguments.of(enrichedOptions, false),
+ Arguments.of(enrichedOptions, true),
+ Arguments.of(enrichedOptions, false));
+ }
+
+ private static Stream<Arguments> providingEnrichedOptionsForDrop() {
+ File tablePath =
+ Paths.get(
+ sharedTempDir.toAbsolutePath().toString(),
+ TABLE_IDENTIFIER.asSummaryString())
+ .toFile();
+ if (!tablePath.exists()) {
+ tablePath.mkdirs();
+ }
+ Map<String, String> enrichedOptions = new HashMap<>();
+ enrichedOptions.put(FILE_PATH.key(),
sharedTempDir.toAbsolutePath().toString());
+ enrichedOptions.put(CHANGE_TRACKING.key(), String.valueOf(false));
+ return Stream.of(
+ Arguments.of(enrichedOptions, false),
+ Arguments.of(enrichedOptions, true),
+ Arguments.of(enrichedOptions, false));
+ }
+
+ private static Map<String, String> addPrefix(
+ Map<String, String> options, String prefix, Predicate<String>
predicate) {
+ Map<String, String> newOptions = new HashMap<>();
+ options.forEach(
+ (k, v) -> {
+ if (predicate.test(k)) {
+ newOptions.put(prefix + k, v);
+ }
+ });
+ return newOptions;
+ }
+
+ private static DynamicTableFactory.Context createTableContext(
+ Map<String, String> sessionOptions, Map<String, String>
tableOptions) {
+ ResolvedCatalogTable resolvedCatalogTable =
+ new ResolvedCatalogTable(
+ CatalogTable.of(
+ Schema.derived(),
+ "a comment",
+ Collections.emptyList(),
+ tableOptions),
+ ResolvedSchema.of(Collections.emptyList()));
+ return new FactoryUtil.DefaultDynamicTableContext(
+ TABLE_IDENTIFIER,
+ resolvedCatalogTable,
+ Collections.emptyMap(),
+ Configuration.fromMap(sessionOptions),
+ Thread.currentThread().getContextClassLoader(),
+ false);
+ }
+
+ private static Map<String, String> of(String... kvs) {
+ assert kvs != null && kvs.length % 2 == 0;
+ Map<String, String> map = new HashMap<>();
+ for (int i = 0; i < kvs.length - 1; i += 2) {
+ map.put(kvs[i], kvs[i + 1]);
+ }
+ return map;
+ }
+}
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreITCase.java
new file mode 100644
index 0000000..4206970
--- /dev/null
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreITCase.java
@@ -0,0 +1,576 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.store.kafka.KafkaTableTestBase;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static
org.apache.flink.table.store.connector.TableStoreFactoryOptions.CHANGE_TRACKING;
+import static
org.apache.flink.table.store.connector.TableStoreITCase.StatementType.CREATE_STATEMENT;
+import static
org.apache.flink.table.store.connector.TableStoreITCase.StatementType.DROP_STATEMENT;
+import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
+import static org.apache.flink.table.store.file.FileStoreOptions.FILE_PATH;
+import static
org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
+import static
org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
+import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** End-to-end tests for table store. */
+@RunWith(Parameterized.class)
+public class TableStoreITCase extends KafkaTableTestBase {
+
+ private static final String CURRENT_CATALOG = "catalog";
+ private static final String CURRENT_DATABASE = "database";
+
+ private final ObjectIdentifier tableIdentifier;
+ private final StatementType statementType;
+ private final boolean enableChangeTracking;
+ private final boolean ignoreException;
+ private final ExpectedResult expectedResult;
+
+ private String rootPath;
+ private ResolvedCatalogTable resolvedTable;
+ @Rule public TestName name = new TestName();
+
+ public TableStoreITCase(
+ String tableName,
+ StatementType statementType,
+ boolean enableChangeTracking,
+ boolean ignoreException,
+ ExpectedResult expectedResult) {
+ this.tableIdentifier = ObjectIdentifier.of(CURRENT_CATALOG,
CURRENT_DATABASE, tableName);
+ this.statementType = statementType;
+ this.enableChangeTracking = enableChangeTracking;
+ this.ignoreException = ignoreException;
+ this.expectedResult = expectedResult;
+ }
+
+ @Parameterized.Parameters(
+ name =
+ "tableName-{0}, statementType-{1},
enableChangeTracking-{2}, ignoreException-{3}, expectedResult-{4}")
+ public static Collection<Object[]> data() {
+ return Stream.concat(prepareCreateTableSpecs().stream(),
prepareDropTableSpecs().stream())
+ .collect(Collectors.toList());
+ }
+
+ @Before
+ @Override
+ public void setup() {
+ super.setup();
+ ((TableEnvironmentImpl) tEnv)
+ .getCatalogManager()
+ .registerCatalog(
+ CURRENT_CATALOG,
+ new GenericInMemoryCatalog(CURRENT_CATALOG,
CURRENT_DATABASE));
+ tEnv.useCatalog(CURRENT_CATALOG);
+ resolvedTable =
+ createResolvedTable(
+ Collections.emptyMap(),
+ RowType.of(new IntType(), new VarCharType()),
+ new int[0]);
+ try {
+ rootPath = TEMPORARY_FOLDER.newFolder().getPath();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ prepareSessionContext();
+ // match parameter type with test name to conditionally skip before
setup, because junit4
+ // doesn't support multiple data providers for different methods
+ if (name.getMethodName().startsWith("testCreateTable")
+ && statementType == CREATE_STATEMENT) {
+ prepareEnvForCreateTable();
+ } else if (name.getMethodName().startsWith("testDropTable")
+ && statementType == DROP_STATEMENT) {
+ prepareEnvForDropTable();
+ }
+ }
+
+ @Test
+ public void testCreateTable() {
+ Assume.assumeTrue(statementType == CREATE_STATEMENT);
+ final String ddl =
+ String.format(
+ "CREATE TABLE%s%s (f0 INT, f1 STRING)\n",
+ ignoreException ? " IF NOT EXISTS " : " ",
+ tableIdentifier.asSerializableString());
+ if (expectedResult.success) {
+ tEnv.executeSql(ddl);
+ // check catalog
+ assertThat(((TableEnvironmentImpl)
tEnv).getCatalogManager().getTable(tableIdentifier))
+ .isPresent();
+ // check table store
+ assertThat(Paths.get(rootPath,
getRelativeFileStoreTablePath(tableIdentifier)).toFile())
+ .exists();
+ // check log store
+ assertThat(topicExists(tableIdentifier.asSummaryString()))
+ .isEqualTo(enableChangeTracking);
+ } else {
+ // check inconsistency between catalog/file store/log store
+ assertThat(ignoreException).isFalse();
+ assertThatThrownBy(() -> tEnv.executeSql(ddl))
+ .getCause()
+ .isInstanceOf(expectedResult.expectedType)
+ .hasMessageContaining(expectedResult.expectedMessage);
+
+ if (expectedResult.expectedMessage.contains(
+ String.format("already exists in Catalog %s",
CURRENT_CATALOG))) {
+ assertThat(
+ ((TableEnvironmentImpl) tEnv)
+ .getCatalogManager()
+ .getTable(tableIdentifier))
+ .isPresent();
+ } else {
+ // throw exception when creating file path/topic, and catalog
meta does not exist
+ assertThat(
+ ((TableEnvironmentImpl) tEnv)
+ .getCatalogManager()
+ .getTable(tableIdentifier))
+ .isNotPresent();
+ }
+ }
+ }
+
+ @Test
+ public void testDropTable() {
+ Assume.assumeTrue(statementType == DROP_STATEMENT);
+ String ddl =
+ String.format(
+ "DROP TABLE%s%s\n",
+ ignoreException ? " IF EXISTS " : " ",
+ tableIdentifier.asSerializableString());
+
+ if (expectedResult.success) {
+ tEnv.executeSql(ddl);
+ // check catalog
+ assertThat(((TableEnvironmentImpl)
tEnv).getCatalogManager().getTable(tableIdentifier))
+ .isNotPresent();
+ // check table store
+ assertThat(Paths.get(rootPath,
getRelativeFileStoreTablePath(tableIdentifier)).toFile())
+ .doesNotExist();
+ // check log store
+
assertThat(topicExists(tableIdentifier.asSummaryString())).isFalse();
+ } else {
+ // check inconsistency between catalog/file store/log store
+ assertThat(ignoreException).isFalse();
+ if
(ValidationException.class.isAssignableFrom(expectedResult.expectedType)) {
+ // successfully delete path/topic, but schema doesn't exist in
catalog
+ assertThatThrownBy(() -> tEnv.executeSql(ddl))
+ .isInstanceOf(expectedResult.expectedType)
+ .hasMessageContaining(expectedResult.expectedMessage);
+ assertThat(
+ ((TableEnvironmentImpl) tEnv)
+ .getCatalogManager()
+ .getTable(tableIdentifier))
+ .isNotPresent();
+ } else {
+ assertThatThrownBy(() -> tEnv.executeSql(ddl))
+ .getCause()
+ .isInstanceOf(expectedResult.expectedType)
+ .hasMessageContaining(expectedResult.expectedMessage);
+ // throw exception when deleting file path/topic, so schema
still exists in catalog
+ assertThat(
+ ((TableEnvironmentImpl) tEnv)
+ .getCatalogManager()
+ .getTable(tableIdentifier))
+ .isPresent();
+ }
+ }
+ }
+
+ // ~ Tools
------------------------------------------------------------------
+
+ private static List<Object[]> prepareCreateTableSpecs() {
+ List<Object[]> specs = new ArrayList<>();
+ // successful case specs
+ specs.add(
+ new Object[] {
+ "table_" + UUID.randomUUID(),
+ CREATE_STATEMENT,
+ true,
+ true,
+ new ExpectedResult().success(true)
+ });
+ specs.add(
+ new Object[] {
+ "table_" + UUID.randomUUID(),
+ CREATE_STATEMENT,
+ false,
+ true,
+ new ExpectedResult().success(true)
+ });
+ specs.add(
+ new Object[] {
+ "table_" + UUID.randomUUID(),
+ CREATE_STATEMENT,
+ true,
+ false,
+ new ExpectedResult().success(true)
+ });
+ specs.add(
+ new Object[] {
+ "table_" + UUID.randomUUID(),
+ CREATE_STATEMENT,
+ false,
+ false,
+ new ExpectedResult().success(true)
+ });
+
+ // failed case specs
+ specs.add(
+ new Object[] {
+ "table_" + UUID.randomUUID(),
+ CREATE_STATEMENT,
+ false,
+ false,
+ new ExpectedResult()
+ .success(false)
+ .expectedType(TableException.class)
+ .expectedMessage("Failed to create file store
path.")
+ });
+ specs.add(
+ new Object[] {
+ "table_" + UUID.randomUUID(),
+ CREATE_STATEMENT,
+ true,
+ false,
+ new ExpectedResult()
+ .success(false)
+ .expectedType(TableException.class)
+ .expectedMessage("Failed to create kafka topic.")
+ });
+ final String tableName = "table_" + UUID.randomUUID();
+ specs.add(
+ new Object[] {
+ tableName,
+ CREATE_STATEMENT,
+ true,
+ false,
+ new ExpectedResult()
+ .success(false)
+ .expectedType(TableAlreadyExistException.class)
+ .expectedMessage(
+ String.format(
+ "Table (or view) %s already exists
in Catalog %s.",
+ ObjectIdentifier.of(
+ CURRENT_CATALOG,
+ CURRENT_DATABASE,
+ tableName)
+ .toObjectPath()
+ .getFullName(),
+ CURRENT_CATALOG))
+ });
+ return specs;
+ }
+
+ private static List<Object[]> prepareDropTableSpecs() {
+ List<Object[]> specs = new ArrayList<>();
+ // successful case specs
+ specs.add(
+ new Object[] {
+ "table_" + UUID.randomUUID(),
+ DROP_STATEMENT,
+ true,
+ true,
+ new ExpectedResult().success(true)
+ });
+ specs.add(
+ new Object[] {
+ "table_" + UUID.randomUUID(),
+ DROP_STATEMENT,
+ false,
+ true,
+ new ExpectedResult().success(true)
+ });
+ specs.add(
+ new Object[] {
+ "table_" + UUID.randomUUID(),
+ DROP_STATEMENT,
+ true,
+ false,
+ new ExpectedResult().success(true)
+ });
+ specs.add(
+ new Object[] {
+ "table_" + UUID.randomUUID(),
+ DROP_STATEMENT,
+ false,
+ false,
+ new ExpectedResult().success(true)
+ });
+
+ // failed case specs
+ specs.add(
+ new Object[] {
+ "table_" + UUID.randomUUID(),
+ DROP_STATEMENT,
+ false,
+ false,
+ new ExpectedResult()
+ .success(false)
+ .expectedType(TableException.class)
+ .expectedMessage("Failed to delete file store
path.")
+ });
+ specs.add(
+ new Object[] {
+ "table_" + UUID.randomUUID(),
+ DROP_STATEMENT,
+ true,
+ false,
+ new ExpectedResult()
+ .success(false)
+ .expectedType(TableException.class)
+ .expectedMessage("Failed to delete kafka topic.")
+ });
+ final String tableName = "table_" + UUID.randomUUID();
+ specs.add(
+ new Object[] {
+ tableName,
+ DROP_STATEMENT,
+ true,
+ false,
+ new ExpectedResult()
+ .success(false)
+ .expectedType(ValidationException.class)
+ .expectedMessage(
+ String.format(
+ "Table with identifier '%s' does
not exist.",
+ ObjectIdentifier.of(
+ CURRENT_CATALOG,
+ CURRENT_DATABASE,
+ tableName)
+ .asSummaryString()))
+ });
+ return specs;
+ }
+
+ private void prepareSessionContext() {
+ Configuration configuration = tEnv.getConfig().getConfiguration();
+ configuration.setString(TABLE_STORE_PREFIX + FILE_PATH.key(),
rootPath);
+ configuration.setString(
+ TABLE_STORE_PREFIX + LOG_PREFIX + BOOTSTRAP_SERVERS.key(),
getBootstrapServers());
+ configuration.setBoolean(TABLE_STORE_PREFIX + CHANGE_TRACKING.key(),
enableChangeTracking);
+ }
+
+ private void prepareEnvForCreateTable() {
+ if (expectedResult.success) {
+ // ensure catalog doesn't contain the table meta
+ tEnv.getCatalog(tEnv.getCurrentCatalog())
+ .ifPresent(
+ (catalog) -> {
+ try {
+
catalog.dropTable(tableIdentifier.toObjectPath(), false);
+ } catch (TableNotExistException ignored) {
+ // ignored
+ }
+ });
+ // ensure log store doesn't exist the topic
+ if (enableChangeTracking && !ignoreException) {
+ deleteTopicIfExists(tableIdentifier.asSummaryString());
+ }
+ } else if (expectedResult.expectedMessage.startsWith("Failed to create
file store path.")) {
+ // failed when creating file store
+ Paths.get(rootPath,
getRelativeFileStoreTablePath(tableIdentifier)).toFile().mkdirs();
+ } else if (expectedResult.expectedMessage.startsWith("Failed to create
kafka topic.")) {
+ // failed when creating log store
+ createTopicIfNotExists(tableIdentifier.asSummaryString(),
BUCKET.defaultValue());
+ } else {
+ // failed when registering schema to catalog
+ tEnv.getCatalog(tEnv.getCurrentCatalog())
+ .ifPresent(
+ (catalog) -> {
+ try {
+ catalog.createTable(
+ tableIdentifier.toObjectPath(),
resolvedTable, false);
+ } catch (TableAlreadyExistException
+ | DatabaseNotExistException ignored) {
+ // ignored
+ }
+ });
+ }
+ }
+
+ private void prepareEnvForDropTable() {
+ ((TableEnvironmentImpl) tEnv)
+ .getCatalogManager()
+ .createTable(resolvedTable, tableIdentifier, false);
+ if (expectedResult.success) {
+ if (ignoreException) {
+ // delete catalog schema does not affect dropping the table
+ tEnv.getCatalog(tEnv.getCurrentCatalog())
+ .ifPresent(
+ (catalog) -> {
+ try {
+
catalog.dropTable(tableIdentifier.toObjectPath(), false);
+ } catch (TableNotExistException ignored) {
+ // ignored
+ }
+ });
+ // delete file store path does not affect dropping the table
+ deleteTablePath();
+ // delete log store topic does not affect dropping the table
+ if (enableChangeTracking) {
+ deleteTopicIfExists(tableIdentifier.asSummaryString());
+ }
+ }
+ } else if (expectedResult.expectedMessage.startsWith("Failed to delete
file store path.")) {
+ // failed when deleting file path
+ deleteTablePath();
+ } else if (expectedResult.expectedMessage.startsWith("Failed to delete
kafka topic.")) {
+ // failed when deleting topic
+ deleteTopicIfExists(tableIdentifier.asSummaryString());
+ } else {
+ // failed when dropping catalog schema
+ tEnv.getCatalog(tEnv.getCurrentCatalog())
+ .ifPresent(
+ (catalog) -> {
+ try {
+
catalog.dropTable(tableIdentifier.toObjectPath(), false);
+ } catch (TableNotExistException ignored) {
+ // ignored
+ }
+ });
+ }
+ }
+
+ private void deleteTablePath() {
+ try {
+ FileUtils.deleteDirectory(
+ Paths.get(rootPath,
getRelativeFileStoreTablePath(tableIdentifier)).toFile());
+ } catch (IOException ignored) {
+ // ignored
+ }
+ }
+
+ static ResolvedCatalogTable createResolvedTable(
+ Map<String, String> options, RowType rowType, int[] pk) {
+ List<String> fieldNames = rowType.getFieldNames();
+ List<DataType> fieldDataTypes =
+ rowType.getChildren().stream()
+ .map(TypeConversions::fromLogicalToDataType)
+ .collect(Collectors.toList());
+ CatalogTable origin =
+ CatalogTable.of(
+ Schema.newBuilder().fromFields(fieldNames,
fieldDataTypes).build(),
+ "a comment",
+ Collections.emptyList(),
+ options);
+ List<Column> resolvedColumns =
+ IntStream.range(0, fieldNames.size())
+ .mapToObj(i -> Column.physical(fieldNames.get(i),
fieldDataTypes.get(i)))
+ .collect(Collectors.toList());
+ UniqueConstraint constraint = null;
+ if (pk.length > 0) {
+ List<String> pkNames =
+
Arrays.stream(pk).mapToObj(fieldNames::get).collect(Collectors.toList());
+ constraint = UniqueConstraint.primaryKey("pk", pkNames);
+ }
+ return new ResolvedCatalogTable(
+ origin, new ResolvedSchema(resolvedColumns,
Collections.emptyList(), constraint));
+ }
+
+ static String getRelativeFileStoreTablePath(ObjectIdentifier
tableIdentifier) {
+ return String.format(
+ "root/%s.catalog/%s.db/%s",
+ tableIdentifier.getCatalogName(),
+ tableIdentifier.getDatabaseName(),
+ tableIdentifier.getObjectName());
+ }
+
+ enum StatementType {
+ CREATE_STATEMENT,
+ DROP_STATEMENT
+ }
+
+ private static class ExpectedResult {
+ private boolean success;
+ private Class<? extends Throwable> expectedType;
+ private String expectedMessage;
+
+ ExpectedResult success(boolean success) {
+ this.success = success;
+ return this;
+ }
+
+ ExpectedResult expectedType(Class<? extends Throwable> exceptionClazz)
{
+ this.expectedType = exceptionClazz;
+ return this;
+ }
+
+ ExpectedResult expectedMessage(String exceptionMessage) {
+ this.expectedMessage = exceptionMessage;
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return "ExpectedResult{"
+ + "success="
+ + success
+ + ", expectedType="
+ + expectedType
+ + ", expectedMessage='"
+ + expectedMessage
+ + '\''
+ + '}';
+ }
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
index a54526f..0233e2d 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
@@ -27,12 +27,16 @@ import
org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
import java.io.Serializable;
import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
import static org.apache.flink.configuration.ConfigOptions.key;
/** Options for {@link FileStore}. */
public class FileStoreOptions implements Serializable {
+ public static final String TABLE_STORE_PREFIX = "table-store.";
+
public static final ConfigOption<Integer> BUCKET =
ConfigOptions.key("bucket")
.intType()
@@ -93,6 +97,20 @@ public class FileStoreOptions implements Serializable {
private final Configuration options;
+ public static Set<ConfigOption<?>> allOptions() {
+ Set<ConfigOption<?>> allOptions = new HashSet<>();
+ allOptions.add(BUCKET);
+ allOptions.add(FILE_PATH);
+ allOptions.add(FILE_FORMAT);
+ allOptions.add(MANIFEST_FORMAT);
+ allOptions.add(MANIFEST_TARGET_FILE_SIZE);
+ allOptions.add(MANIFEST_MERGE_MIN_COUNT);
+ allOptions.add(PARTITION_DEFAULT_NAME);
+ allOptions.add(SNAPSHOT_NUM_RETAINED);
+ allOptions.add(SNAPSHOT_TIME_RETAINED);
+ return allOptions;
+ }
+
public FileStoreOptions(Configuration options) {
this.options = options;
// TODO validate all keys
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogOptions.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogOptions.java
index 3f36945..f975233 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogOptions.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogOptions.java
@@ -32,6 +32,8 @@ import static
org.apache.flink.table.store.utils.OptionsUtils.formatEnumOption;
/** Options for log store. */
public class LogOptions {
+ public static final String LOG_PREFIX = "log.";
+
public static final ConfigOption<LogStartupMode> SCAN =
ConfigOptions.key("scan")
.enumType(LogStartupMode.class)
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
index 35c435d..2300fcf 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
@@ -46,10 +46,10 @@ import org.apache.flink.types.RowKind;
public interface LogStoreTableFactory extends DynamicTableFactory {
/** Notifies the listener that a table creation occurred. */
- void onCreateTable(Context context, int numBucket);
+ void onCreateTable(Context context, int numBucket, boolean ignoreIfExists);
/** Notifies the listener that a table drop occurred. */
- void onDropTable(Context context);
+ void onDropTable(Context context, boolean ignoreIfNotExists);
/**
* Creates a {@link LogSourceProvider} instance from a {@link
CatalogTable} and additional
diff --git a/flink-table-store-kafka/pom.xml b/flink-table-store-kafka/pom.xml
index 75e6811..676754e 100644
--- a/flink-table-store-kafka/pom.xml
+++ b/flink-table-store-kafka/pom.xml
@@ -157,4 +157,20 @@ under the License.
<scope>test</scope>
</dependency>
</dependencies>
-</project>
\ No newline at end of file
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
index 9208940..943d0b6 100644
---
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
+++
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
@@ -35,6 +35,7 @@ import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import java.util.Collections;
@@ -91,7 +92,7 @@ public class KafkaLogStoreFactory implements
LogStoreTableFactory {
}
@Override
- public void onCreateTable(DynamicTableFactory.Context context, int
numBucket) {
+ public void onCreateTable(Context context, int numBucket, boolean
ignoreIfExists) {
FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this,
context);
helper.validateExcept(KAFKA_PREFIX);
try (AdminClient adminClient =
AdminClient.create(toKafkaProperties(helper.getOptions()))) {
@@ -103,27 +104,54 @@ public class KafkaLogStoreFactory implements
LogStoreTableFactory {
configs.put(
TopicConfig.RETENTION_MS_CONFIG,
String.valueOf(retention.toMillis())));
-
NewTopic topicObj =
new NewTopic(topic(context), Optional.of(numBucket),
Optional.empty())
.configs(configs);
adminClient.createTopics(Collections.singleton(topicObj)).all().get();
} catch (ExecutionException | InterruptedException e) {
+ if (e.getCause() instanceof TopicExistsException) {
+ if (ignoreIfExists) {
+ return;
+ }
+ throw new TableException(
+ String.format(
+ "Failed to create kafka topic. "
+ + "Reason: topic %s exists for table
%s. "
+ + "Suggestion: please try `DESCRIBE
TABLE %s` to "
+ + "check whether table exists in
current catalog. "
+ + "If table exists and the DDL needs
to be executed "
+ + "multiple times, please use `CREATE
TABLE IF NOT EXISTS` ddl instead. "
+ + "Otherwise, please choose another
table name "
+ + "or manually delete the current
topic and try again.",
+ topic(context),
+
context.getObjectIdentifier().asSerializableString(),
+
context.getObjectIdentifier().asSerializableString()));
+ }
throw new TableException("Error in createTopic", e);
}
}
@Override
- public void onDropTable(DynamicTableFactory.Context context) {
+ public void onDropTable(Context context, boolean ignoreIfNotExists) {
try (AdminClient adminClient =
AdminClient.create(
toKafkaProperties(createTableFactoryHelper(this,
context).getOptions()))) {
adminClient.deleteTopics(Collections.singleton(topic(context))).all().get();
} catch (ExecutionException e) {
- // ignore topic not exists
- if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
- throw new TableException("Error in deleteTopic", e);
+ // check the cause to ignore topic not exists conditionally
+ if (e.getCause() instanceof UnknownTopicOrPartitionException) {
+ if (ignoreIfNotExists) {
+ return;
+ }
+ throw new TableException(
+ String.format(
+ "Failed to delete kafka topic. "
+ + "Reason: topic %s doesn't exist for
table %s. "
+ + "Suggestion: please try `DROP TABLE
IF EXISTS` ddl instead.",
+ topic(context),
+
context.getObjectIdentifier().asSerializableString()));
}
+ throw new TableException("Error in deleteTopic", e);
} catch (InterruptedException e) {
throw new TableException("Error in deleteTopic", e);
}
diff --git
a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
index 8b9dccf..bb18a11 100644
---
a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
+++
b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
@@ -50,7 +50,7 @@ public class KafkaLogITCase extends KafkaTableTestBase {
@Test
public void testDropEmpty() {
// Expect no exceptions to be thrown
- factory.onDropTable(testContext(getBootstrapServers(),
LogChangelogMode.AUTO, true));
+ factory.onDropTable(testContext(getBootstrapServers(),
LogChangelogMode.AUTO, true), true);
}
@Test
@@ -141,7 +141,7 @@ public class KafkaLogITCase extends KafkaTableTestBase {
KafkaLogSourceProvider sourceProvider =
factory.createSourceProvider(context, SOURCE_CONTEXT);
- factory.onCreateTable(context, 3);
+ factory.onCreateTable(context, 3, true);
try {
// transactional need to commit
enableCheckpoint();
@@ -198,7 +198,7 @@ public class KafkaLogITCase extends KafkaTableTestBase {
assertRow(records.get(1), RowKind.INSERT, 11, 12);
assertRow(records.get(2), RowKind.INSERT, 13, 14);
} finally {
- factory.onDropTable(context);
+ factory.onDropTable(context, true);
}
}
diff --git
a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java
b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java
index dbe7897..db92832 100644
---
a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java
+++
b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java
@@ -25,11 +25,14 @@ import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.DockerImageVersions;
import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.After;
import org.junit.Before;
@@ -43,8 +46,10 @@ import org.testcontainers.utility.DockerImageName;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
@@ -126,6 +131,44 @@ public abstract class KafkaTableTestBase extends
AbstractTestBase {
return KAFKA_CONTAINER.getBootstrapServers();
}
+ protected boolean topicExists(String topicName) {
+ return describeExternalTopics().containsKey(topicName);
+ }
+
+ protected void createTopicIfNotExists(String topicName, int numBucket) {
+ try (final AdminClient adminClient =
AdminClient.create(getStandardProps())) {
+ if (!adminClient.listTopics().names().get().contains(topicName)) {
+ adminClient
+ .createTopics(
+ Collections.singleton(
+ new NewTopic(
+ topicName,
+ Optional.of(numBucket),
+ Optional.empty())))
+ .all()
+ .get();
+ }
+ } catch (Exception e) {
+ if (!(e.getCause() instanceof TopicExistsException)) {
+ throw new RuntimeException(
+ String.format("Failed to create Kafka topic %s",
topicName), e);
+ }
+ }
+ }
+
+ protected void deleteTopicIfExists(String topicName) {
+ try (final AdminClient adminClient =
AdminClient.create(getStandardProps())) {
+ if (adminClient.listTopics().names().get().contains(topicName)) {
+
adminClient.deleteTopics(Collections.singleton(topicName)).all().get();
+ }
+ } catch (Exception e) {
+ if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
+ throw new RuntimeException(
+ String.format("Failed to drop Kafka topic %s",
topicName), e);
+ }
+ }
+ }
+
// ------------------------ For Debug Logging Purpose
----------------------------------
private void scheduleTimeoutLogger(Duration period, Runnable
loggingAction) {