This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new af4be76  [FLINK-26208] Introduce TableStoreFactory to support 
create/drop managed table via SQL
af4be76 is described below

commit af4be7675833cf000c1f9b1891bba0a106b27d4a
Author: Jane Chan <[email protected]>
AuthorDate: Tue Mar 1 17:47:54 2022 +0800

    [FLINK-26208] Introduce TableStoreFactory to support create/drop managed 
table via SQL
    
    This closes #25
---
 flink-table-store-connector/pom.xml                |  72 ++-
 .../table/store/connector/TableStoreFactory.java   | 191 +++++++
 .../store/connector/TableStoreFactoryOptions.java  |  39 ++
 .../org.apache.flink.table.factories.Factory       |  16 +
 .../store/connector/TableStoreFactoryTest.java     | 282 ++++++++++
 .../table/store/connector/TableStoreITCase.java    | 576 +++++++++++++++++++++
 .../flink/table/store/file/FileStoreOptions.java   |  18 +
 .../apache/flink/table/store/log/LogOptions.java   |   2 +
 .../table/store/log/LogStoreTableFactory.java      |   4 +-
 flink-table-store-kafka/pom.xml                    |  18 +-
 .../table/store/kafka/KafkaLogStoreFactory.java    |  40 +-
 .../flink/table/store/kafka/KafkaLogITCase.java    |   6 +-
 .../table/store/kafka/KafkaTableTestBase.java      |  43 ++
 13 files changed, 1294 insertions(+), 13 deletions(-)

diff --git a/flink-table-store-connector/pom.xml 
b/flink-table-store-connector/pom.xml
index f05cd72..0b3e178 100644
--- a/flink-table-store-connector/pom.xml
+++ b/flink-table-store-connector/pom.xml
@@ -96,6 +96,34 @@ under the License.
 
         <dependency>
             <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-store-kafka</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-store-kafka</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>junit</groupId>
+                    <artifactId>junit</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
             <artifactId>flink-connector-test-utils</artifactId>
             <version>${flink.version}</version>
             <scope>test</scope>
@@ -141,6 +169,48 @@ under the License.
             </exclusions>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <!-- include 2.0 server for tests  -->
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_${scala.binary.version}</artifactId>
+            <version>2.4.1</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.dataformat</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.module</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.datatype</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>kafka</artifactId>
+            <version>${testcontainers.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>junit</groupId>
+                    <artifactId>junit</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
     </dependencies>
 
     <build>
@@ -158,4 +228,4 @@ under the License.
             </plugin>
         </plugins>
     </build>
-</project>
\ No newline at end of file
+</project>
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
new file mode 100644
index 0000000..6b7e5a2
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.ManagedTableFactory;
+import org.apache.flink.table.store.file.FileStoreOptions;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.flink.table.store.connector.TableStoreFactoryOptions.CHANGE_TRACKING;
+import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
+import static org.apache.flink.table.store.file.FileStoreOptions.FILE_PATH;
+import static 
org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
+import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
+import static 
org.apache.flink.table.store.log.LogStoreTableFactory.discoverLogStoreFactory;
+
+/** Default implementation of {@link ManagedTableFactory}. */
+public class TableStoreFactory implements ManagedTableFactory {
+
+    @Override
+    public Map<String, String> enrichOptions(Context context) {
+        Map<String, String> enrichedOptions = new 
HashMap<>(context.getCatalogTable().getOptions());
+        ((Configuration) context.getConfiguration())
+                .toMap()
+                .forEach(
+                        (k, v) -> {
+                            if (k.startsWith(TABLE_STORE_PREFIX)) {
+                                enrichedOptions.putIfAbsent(
+                                        
k.substring(TABLE_STORE_PREFIX.length()), v);
+                            }
+                        });
+        return enrichedOptions;
+    }
+
+    @Override
+    public void onCreateTable(Context context, boolean ignoreIfExists) {
+        Map<String, String> enrichedOptions = 
context.getCatalogTable().getOptions();
+        Path path = tablePath(enrichedOptions, context.getObjectIdentifier());
+        try {
+            if (path.getFileSystem().exists(path) && !ignoreIfExists) {
+                throw new TableException(
+                        String.format(
+                                "Failed to create file store path. "
+                                        + "Reason: directory %s exists for 
table %s. "
+                                        + "Suggestion: please try `DESCRIBE 
TABLE %s` to "
+                                        + "first check whether table exists in 
current catalog. "
+                                        + "If table exists in catalog, and 
data files under current path "
+                                        + "are valid, please use `CREATE TABLE 
IF NOT EXISTS` ddl instead. "
+                                        + "Otherwise, please choose another 
table name "
+                                        + "or manually delete the current path 
and try again.",
+                                path,
+                                
context.getObjectIdentifier().asSerializableString(),
+                                
context.getObjectIdentifier().asSerializableString()));
+            }
+            path.getFileSystem().mkdirs(path);
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+
+        if (enableChangeTracking(enrichedOptions)) {
+            Context logStoreContext =
+                    new FactoryUtil.DefaultDynamicTableContext(
+                            context.getObjectIdentifier(),
+                            
context.getCatalogTable().copy(filterLogStoreOptions(enrichedOptions)),
+                            
filterLogStoreOptions(context.getEnrichmentOptions()),
+                            context.getConfiguration(),
+                            context.getClassLoader(),
+                            context.isTemporary());
+            discoverLogStoreFactory(
+                            Thread.currentThread().getContextClassLoader(),
+                            TableStoreFactoryOptions.LOG_SYSTEM.defaultValue())
+                    .onCreateTable(
+                            logStoreContext,
+                            Integer.parseInt(
+                                    enrichedOptions.getOrDefault(
+                                            BUCKET.key(), 
BUCKET.defaultValue().toString())),
+                            ignoreIfExists);
+        }
+    }
+
+    @Override
+    public void onDropTable(Context context, boolean ignoreIfNotExists) {
+        Map<String, String> enrichedOptions = 
context.getCatalogTable().getOptions();
+        Path path = tablePath(enrichedOptions, context.getObjectIdentifier());
+        try {
+            if (path.getFileSystem().exists(path)) {
+                path.getFileSystem().delete(path, true);
+            } else if (!ignoreIfNotExists) {
+                throw new TableException(
+                        String.format(
+                                "Failed to delete file store path. "
+                                        + "Reason: directory %s doesn't exist 
for table %s. "
+                                        + "Suggestion: please try `DROP TABLE 
IF EXISTS` ddl instead.",
+                                path, 
context.getObjectIdentifier().asSerializableString()));
+            }
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+        if (enableChangeTracking(enrichedOptions)) {
+            Context logStoreContext =
+                    new FactoryUtil.DefaultDynamicTableContext(
+                            context.getObjectIdentifier(),
+                            
context.getCatalogTable().copy(filterLogStoreOptions(enrichedOptions)),
+                            context.getEnrichmentOptions(),
+                            context.getConfiguration(),
+                            context.getClassLoader(),
+                            context.isTemporary());
+            discoverLogStoreFactory(
+                            Thread.currentThread().getContextClassLoader(),
+                            TableStoreFactoryOptions.LOG_SYSTEM.defaultValue())
+                    .onDropTable(logStoreContext, ignoreIfNotExists);
+        }
+    }
+
+    @Override
+    public Map<String, String> onCompactTable(
+            Context context, CatalogPartitionSpec catalogPartitionSpec) {
+        throw new UnsupportedOperationException("Not implement yet");
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        Set<ConfigOption<?>> options = FileStoreOptions.allOptions();
+        options.add(CHANGE_TRACKING);
+        return options;
+    }
+
+    // ~ Tools 
------------------------------------------------------------------
+
+    @VisibleForTesting
+    Map<String, String> filterLogStoreOptions(Map<String, String> 
enrichedOptions) {
+        Map<String, String> logStoreOptions = new HashMap<>();
+        enrichedOptions.forEach(
+                (k, v) -> {
+                    if (k.startsWith(LOG_PREFIX)) {
+                        logStoreOptions.put(k.substring(LOG_PREFIX.length()), 
v);
+                    }
+                });
+        return logStoreOptions;
+    }
+
+    private static Path tablePath(Map<String, String> options, 
ObjectIdentifier identifier) {
+        return new Path(
+                new Path(options.get(FILE_PATH.key())),
+                String.format(
+                        "root/%s.catalog/%s.db/%s",
+                        identifier.getCatalogName(),
+                        identifier.getDatabaseName(),
+                        identifier.getObjectName()));
+    }
+
+    private static boolean enableChangeTracking(Map<String, String> options) {
+        return Boolean.parseBoolean(
+                options.getOrDefault(
+                        CHANGE_TRACKING.key(), 
CHANGE_TRACKING.defaultValue().toString()));
+    }
+}
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
new file mode 100644
index 0000000..21393ac
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/** Options for {@link TableStoreFactory}. */
+public class TableStoreFactoryOptions {
+
+    public static final ConfigOption<Boolean> CHANGE_TRACKING =
+            ConfigOptions.key("change-tracking")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Whether to enable change tracking. The default 
value is true, which means consuming changes from the table.");
+
+    public static final ConfigOption<String> LOG_SYSTEM =
+            ConfigOptions.key("log.system")
+                    .stringType()
+                    .defaultValue("kafka")
+                    .withDescription("The log system used to keep changes of 
the table.");
+}
diff --git 
a/flink-table-store-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/flink-table-store-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000..e377345
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.store.connector.TableStoreFactory
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreFactoryTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreFactoryTest.java
new file mode 100644
index 0000000..e0c9b5b
--- /dev/null
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreFactoryTest.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.ManagedTableFactory;
+import org.apache.flink.table.store.log.LogOptions;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.table.store.connector.TableStoreFactoryOptions.CHANGE_TRACKING;
+import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
+import static org.apache.flink.table.store.file.FileStoreOptions.FILE_PATH;
+import static 
org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
+import static 
org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
+import static org.apache.flink.table.store.log.LogOptions.CONSISTENCY;
+import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test cases for {@link TableStoreFactory}. */
+public class TableStoreFactoryTest {
+
+    private static final ObjectIdentifier TABLE_IDENTIFIER =
+            ObjectIdentifier.of("catalog", "database", "table");
+
+    private final ManagedTableFactory tableStoreFactory = new 
TableStoreFactory();
+
+    @TempDir private static java.nio.file.Path sharedTempDir;
+    private DynamicTableFactory.Context context;
+
+    @ParameterizedTest
+    @MethodSource("providingOptions")
+    public void testEnrichOptions(
+            Map<String, String> sessionOptions,
+            Map<String, String> tableOptions,
+            Map<String, String> expectedEnrichedOptions) {
+        context = createTableContext(sessionOptions, tableOptions);
+        Map<String, String> actualEnrichedOptions = 
tableStoreFactory.enrichOptions(context);
+        assertThat(actualEnrichedOptions)
+                .containsExactlyInAnyOrderEntriesOf(expectedEnrichedOptions);
+    }
+
+    @ParameterizedTest
+    @MethodSource("providingEnrichedOptionsForCreation")
+    public void testOnCreateTable(Map<String, String> enrichedOptions, boolean 
ignoreIfExists) {
+        context = createTableContext(Collections.emptyMap(), enrichedOptions);
+        Path expectedPath =
+                Paths.get(
+                        sharedTempDir.toAbsolutePath().toString(),
+                        String.format(
+                                "root/%s.catalog/%s.db/%s",
+                                TABLE_IDENTIFIER.getCatalogName(),
+                                TABLE_IDENTIFIER.getDatabaseName(),
+                                TABLE_IDENTIFIER.getObjectName()));
+        boolean exist = expectedPath.toFile().exists();
+        if (ignoreIfExists || !exist) {
+            tableStoreFactory.onCreateTable(context, ignoreIfExists);
+            assertThat(expectedPath).exists();
+        } else {
+            assertThatThrownBy(() -> tableStoreFactory.onCreateTable(context, 
false))
+                    .isInstanceOf(TableException.class)
+                    .hasMessageContaining(
+                            String.format(
+                                    "Failed to create file store path. "
+                                            + "Reason: directory %s exists for 
table %s. "
+                                            + "Suggestion: please try 
`DESCRIBE TABLE %s` to "
+                                            + "first check whether table 
exists in current catalog. "
+                                            + "If table exists in catalog, and 
data files under current path "
+                                            + "are valid, please use `CREATE 
TABLE IF NOT EXISTS` ddl instead. "
+                                            + "Otherwise, please choose 
another table name "
+                                            + "or manually delete the current 
path and try again.",
+                                    expectedPath,
+                                    TABLE_IDENTIFIER.asSerializableString(),
+                                    TABLE_IDENTIFIER.asSerializableString()));
+        }
+    }
+
+    @ParameterizedTest
+    @MethodSource("providingEnrichedOptionsForDrop")
+    public void testOnDropTable(Map<String, String> enrichedOptions, boolean 
ignoreIfNotExists) {
+        context = createTableContext(Collections.emptyMap(), enrichedOptions);
+        Path expectedPath =
+                Paths.get(
+                        sharedTempDir.toAbsolutePath().toString(),
+                        String.format(
+                                "root/%s.catalog/%s.db/%s",
+                                TABLE_IDENTIFIER.getCatalogName(),
+                                TABLE_IDENTIFIER.getDatabaseName(),
+                                TABLE_IDENTIFIER.getObjectName()));
+        boolean exist = expectedPath.toFile().exists();
+        if (exist || ignoreIfNotExists) {
+            tableStoreFactory.onDropTable(context, ignoreIfNotExists);
+            assertThat(expectedPath).doesNotExist();
+        } else {
+            assertThatThrownBy(() -> tableStoreFactory.onDropTable(context, 
false))
+                    .isInstanceOf(TableException.class)
+                    .hasMessageContaining(
+                            String.format(
+                                    "Failed to delete file store path. "
+                                            + "Reason: directory %s doesn't 
exist for table %s. "
+                                            + "Suggestion: please try `DROP 
TABLE IF EXISTS` ddl instead.",
+                                    expectedPath, 
TABLE_IDENTIFIER.asSerializableString()));
+        }
+    }
+
+    @Test
+    public void testFilterLogStoreOptions() {
+        // mix invalid key and leave value to empty to emphasize the deferred 
validation
+        Map<String, String> expectedLogOptions =
+                of(
+                        LogOptions.SCAN.key(),
+                        "",
+                        LogOptions.RETENTION.key(),
+                        "",
+                        "dummy.key",
+                        "",
+                        LogOptions.CHANGELOG_MODE.key(),
+                        "");
+        Map<String, String> enrichedOptions =
+                addPrefix(expectedLogOptions, LOG_PREFIX, (key) -> true);
+        enrichedOptions.put("foo", "bar");
+
+        assertThat(((TableStoreFactory) 
tableStoreFactory).filterLogStoreOptions(enrichedOptions))
+                .containsExactlyInAnyOrderEntriesOf(expectedLogOptions);
+    }
+
+    // ~ Tools 
------------------------------------------------------------------
+
+    private static Stream<Arguments> providingOptions() {
+        Map<String, String> enrichedOptions =
+                of(
+                        BUCKET.key(),
+                        BUCKET.defaultValue().toString(),
+                        FILE_PATH.key(),
+                        sharedTempDir.toString(),
+                        LOG_PREFIX + BOOTSTRAP_SERVERS.key(),
+                        "localhost:9092",
+                        LOG_PREFIX + CONSISTENCY.key(),
+                        CONSISTENCY.defaultValue().name());
+
+        // default
+        Arguments arg0 =
+                Arguments.of(
+                        Collections.emptyMap(), Collections.emptyMap(), 
Collections.emptyMap());
+
+        // set configuration under session level
+        Arguments arg1 =
+                Arguments.of(
+                        addPrefix(enrichedOptions, TABLE_STORE_PREFIX, (key) 
-> true),
+                        Collections.emptyMap(),
+                        enrichedOptions);
+
+        // set configuration under table level
+        Arguments arg2 = Arguments.of(Collections.emptyMap(), enrichedOptions, 
enrichedOptions);
+
+        // set both session and table level configuration to test options 
combination
+        Map<String, String> tableOptions = new HashMap<>(enrichedOptions);
+        tableOptions.remove(FILE_PATH.key());
+        tableOptions.remove(CONSISTENCY.key());
+        Arguments arg3 =
+                Arguments.of(
+                        addPrefix(
+                                enrichedOptions,
+                                TABLE_STORE_PREFIX,
+                                (key) -> !tableOptions.containsKey(key)),
+                        tableOptions,
+                        enrichedOptions);
+
+        // set same key with different value to test table configuration take 
precedence
+        Map<String, String> sessionOptions = new HashMap<>();
+        sessionOptions.put(
+                TABLE_STORE_PREFIX + BUCKET.key(), 
String.valueOf(BUCKET.defaultValue() + 1));
+        Arguments arg4 = Arguments.of(sessionOptions, enrichedOptions, 
enrichedOptions);
+        return Stream.of(arg0, arg1, arg2, arg3, arg4);
+    }
+
+    private static Stream<Arguments> providingEnrichedOptionsForCreation() {
+        Map<String, String> enrichedOptions = new HashMap<>();
+        enrichedOptions.put(FILE_PATH.key(), 
sharedTempDir.toAbsolutePath().toString());
+        enrichedOptions.put(CHANGE_TRACKING.key(), String.valueOf(false));
+        return Stream.of(
+                Arguments.of(enrichedOptions, false),
+                Arguments.of(enrichedOptions, true),
+                Arguments.of(enrichedOptions, false));
+    }
+
+    private static Stream<Arguments> providingEnrichedOptionsForDrop() {
+        File tablePath =
+                Paths.get(
+                                sharedTempDir.toAbsolutePath().toString(),
+                                TABLE_IDENTIFIER.asSummaryString())
+                        .toFile();
+        if (!tablePath.exists()) {
+            tablePath.mkdirs();
+        }
+        Map<String, String> enrichedOptions = new HashMap<>();
+        enrichedOptions.put(FILE_PATH.key(), 
sharedTempDir.toAbsolutePath().toString());
+        enrichedOptions.put(CHANGE_TRACKING.key(), String.valueOf(false));
+        return Stream.of(
+                Arguments.of(enrichedOptions, false),
+                Arguments.of(enrichedOptions, true),
+                Arguments.of(enrichedOptions, false));
+    }
+
+    private static Map<String, String> addPrefix(
+            Map<String, String> options, String prefix, Predicate<String> 
predicate) {
+        Map<String, String> newOptions = new HashMap<>();
+        options.forEach(
+                (k, v) -> {
+                    if (predicate.test(k)) {
+                        newOptions.put(prefix + k, v);
+                    }
+                });
+        return newOptions;
+    }
+
+    private static DynamicTableFactory.Context createTableContext(
+            Map<String, String> sessionOptions, Map<String, String> 
tableOptions) {
+        ResolvedCatalogTable resolvedCatalogTable =
+                new ResolvedCatalogTable(
+                        CatalogTable.of(
+                                Schema.derived(),
+                                "a comment",
+                                Collections.emptyList(),
+                                tableOptions),
+                        ResolvedSchema.of(Collections.emptyList()));
+        return new FactoryUtil.DefaultDynamicTableContext(
+                TABLE_IDENTIFIER,
+                resolvedCatalogTable,
+                Collections.emptyMap(),
+                Configuration.fromMap(sessionOptions),
+                Thread.currentThread().getContextClassLoader(),
+                false);
+    }
+
+    private static Map<String, String> of(String... kvs) {
+        assert kvs != null && kvs.length % 2 == 0;
+        Map<String, String> map = new HashMap<>();
+        for (int i = 0; i < kvs.length - 1; i += 2) {
+            map.put(kvs[i], kvs[i + 1]);
+        }
+        return map;
+    }
+}
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreITCase.java
new file mode 100644
index 0000000..4206970
--- /dev/null
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreITCase.java
@@ -0,0 +1,576 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.store.kafka.KafkaTableTestBase;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.table.store.connector.TableStoreFactoryOptions.CHANGE_TRACKING;
+import static 
org.apache.flink.table.store.connector.TableStoreITCase.StatementType.CREATE_STATEMENT;
+import static 
org.apache.flink.table.store.connector.TableStoreITCase.StatementType.DROP_STATEMENT;
+import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
+import static org.apache.flink.table.store.file.FileStoreOptions.FILE_PATH;
+import static 
org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
+import static 
org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
+import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** End-to-end tests for table store. */
+@RunWith(Parameterized.class)
+public class TableStoreITCase extends KafkaTableTestBase {
+
+    private static final String CURRENT_CATALOG = "catalog";
+    private static final String CURRENT_DATABASE = "database";
+
+    private final ObjectIdentifier tableIdentifier;
+    private final StatementType statementType;
+    private final boolean enableChangeTracking;
+    private final boolean ignoreException;
+    private final ExpectedResult expectedResult;
+
+    private String rootPath;
+    private ResolvedCatalogTable resolvedTable;
+    @Rule public TestName name = new TestName();
+
+    public TableStoreITCase(
+            String tableName,
+            StatementType statementType,
+            boolean enableChangeTracking,
+            boolean ignoreException,
+            ExpectedResult expectedResult) {
+        this.tableIdentifier = ObjectIdentifier.of(CURRENT_CATALOG, 
CURRENT_DATABASE, tableName);
+        this.statementType = statementType;
+        this.enableChangeTracking = enableChangeTracking;
+        this.ignoreException = ignoreException;
+        this.expectedResult = expectedResult;
+    }
+
+    @Parameterized.Parameters(
+            name =
+                    "tableName-{0}, statementType-{1}, 
enableChangeTracking-{2}, ignoreException-{3}, expectedResult-{4}")
+    public static Collection<Object[]> data() {
+        return Stream.concat(prepareCreateTableSpecs().stream(), 
prepareDropTableSpecs().stream())
+                .collect(Collectors.toList());
+    }
+
+    @Before
+    @Override
+    public void setup() {
+        super.setup();
+        ((TableEnvironmentImpl) tEnv)
+                .getCatalogManager()
+                .registerCatalog(
+                        CURRENT_CATALOG,
+                        new GenericInMemoryCatalog(CURRENT_CATALOG, 
CURRENT_DATABASE));
+        tEnv.useCatalog(CURRENT_CATALOG);
+        resolvedTable =
+                createResolvedTable(
+                        Collections.emptyMap(),
+                        RowType.of(new IntType(), new VarCharType()),
+                        new int[0]);
+        try {
+            rootPath = TEMPORARY_FOLDER.newFolder().getPath();
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+        prepareSessionContext();
+        // match parameter type with test name to conditionally skip before 
setup, because junit4
+        // doesn't support multiple data providers for different methods
+        if (name.getMethodName().startsWith("testCreateTable")
+                && statementType == CREATE_STATEMENT) {
+            prepareEnvForCreateTable();
+        } else if (name.getMethodName().startsWith("testDropTable")
+                && statementType == DROP_STATEMENT) {
+            prepareEnvForDropTable();
+        }
+    }
+
+    @Test
+    public void testCreateTable() {
+        Assume.assumeTrue(statementType == CREATE_STATEMENT);
+        final String ddl =
+                String.format(
+                        "CREATE TABLE%s%s (f0 INT, f1 STRING)\n",
+                        ignoreException ? " IF NOT EXISTS " : " ",
+                        tableIdentifier.asSerializableString());
+        if (expectedResult.success) {
+            tEnv.executeSql(ddl);
+            // check catalog
+            assertThat(((TableEnvironmentImpl) 
tEnv).getCatalogManager().getTable(tableIdentifier))
+                    .isPresent();
+            // check table store
+            assertThat(Paths.get(rootPath, 
getRelativeFileStoreTablePath(tableIdentifier)).toFile())
+                    .exists();
+            // check log store
+            assertThat(topicExists(tableIdentifier.asSummaryString()))
+                    .isEqualTo(enableChangeTracking);
+        } else {
+            // check inconsistency between catalog/file store/log store
+            assertThat(ignoreException).isFalse();
+            assertThatThrownBy(() -> tEnv.executeSql(ddl))
+                    .getCause()
+                    .isInstanceOf(expectedResult.expectedType)
+                    .hasMessageContaining(expectedResult.expectedMessage);
+
+            if (expectedResult.expectedMessage.contains(
+                    String.format("already exists in Catalog %s", 
CURRENT_CATALOG))) {
+                assertThat(
+                                ((TableEnvironmentImpl) tEnv)
+                                        .getCatalogManager()
+                                        .getTable(tableIdentifier))
+                        .isPresent();
+            } else {
+                // throw exception when creating file path/topic, and catalog 
meta does not exist
+                assertThat(
+                                ((TableEnvironmentImpl) tEnv)
+                                        .getCatalogManager()
+                                        .getTable(tableIdentifier))
+                        .isNotPresent();
+            }
+        }
+    }
+
+    @Test
+    public void testDropTable() {
+        Assume.assumeTrue(statementType == DROP_STATEMENT);
+        String ddl =
+                String.format(
+                        "DROP TABLE%s%s\n",
+                        ignoreException ? " IF EXISTS " : " ",
+                        tableIdentifier.asSerializableString());
+
+        if (expectedResult.success) {
+            tEnv.executeSql(ddl);
+            // check catalog
+            assertThat(((TableEnvironmentImpl) 
tEnv).getCatalogManager().getTable(tableIdentifier))
+                    .isNotPresent();
+            // check table store
+            assertThat(Paths.get(rootPath, 
getRelativeFileStoreTablePath(tableIdentifier)).toFile())
+                    .doesNotExist();
+            // check log store
+            
assertThat(topicExists(tableIdentifier.asSummaryString())).isFalse();
+        } else {
+            // check inconsistency between catalog/file store/log store
+            assertThat(ignoreException).isFalse();
+            if 
(ValidationException.class.isAssignableFrom(expectedResult.expectedType)) {
+                // successfully delete path/topic, but schema doesn't exist in 
catalog
+                assertThatThrownBy(() -> tEnv.executeSql(ddl))
+                        .isInstanceOf(expectedResult.expectedType)
+                        .hasMessageContaining(expectedResult.expectedMessage);
+                assertThat(
+                                ((TableEnvironmentImpl) tEnv)
+                                        .getCatalogManager()
+                                        .getTable(tableIdentifier))
+                        .isNotPresent();
+            } else {
+                assertThatThrownBy(() -> tEnv.executeSql(ddl))
+                        .getCause()
+                        .isInstanceOf(expectedResult.expectedType)
+                        .hasMessageContaining(expectedResult.expectedMessage);
+                // throw exception when deleting file path/topic, so schema 
still exists in catalog
+                assertThat(
+                                ((TableEnvironmentImpl) tEnv)
+                                        .getCatalogManager()
+                                        .getTable(tableIdentifier))
+                        .isPresent();
+            }
+        }
+    }
+
+    // ~ Tools 
------------------------------------------------------------------
+
+    private static List<Object[]> prepareCreateTableSpecs() {
+        List<Object[]> specs = new ArrayList<>();
+        // successful case specs
+        specs.add(
+                new Object[] {
+                    "table_" + UUID.randomUUID(),
+                    CREATE_STATEMENT,
+                    true,
+                    true,
+                    new ExpectedResult().success(true)
+                });
+        specs.add(
+                new Object[] {
+                    "table_" + UUID.randomUUID(),
+                    CREATE_STATEMENT,
+                    false,
+                    true,
+                    new ExpectedResult().success(true)
+                });
+        specs.add(
+                new Object[] {
+                    "table_" + UUID.randomUUID(),
+                    CREATE_STATEMENT,
+                    true,
+                    false,
+                    new ExpectedResult().success(true)
+                });
+        specs.add(
+                new Object[] {
+                    "table_" + UUID.randomUUID(),
+                    CREATE_STATEMENT,
+                    false,
+                    false,
+                    new ExpectedResult().success(true)
+                });
+
+        // failed case specs
+        specs.add(
+                new Object[] {
+                    "table_" + UUID.randomUUID(),
+                    CREATE_STATEMENT,
+                    false,
+                    false,
+                    new ExpectedResult()
+                            .success(false)
+                            .expectedType(TableException.class)
+                            .expectedMessage("Failed to create file store 
path.")
+                });
+        specs.add(
+                new Object[] {
+                    "table_" + UUID.randomUUID(),
+                    CREATE_STATEMENT,
+                    true,
+                    false,
+                    new ExpectedResult()
+                            .success(false)
+                            .expectedType(TableException.class)
+                            .expectedMessage("Failed to create kafka topic.")
+                });
+        final String tableName = "table_" + UUID.randomUUID();
+        specs.add(
+                new Object[] {
+                    tableName,
+                    CREATE_STATEMENT,
+                    true,
+                    false,
+                    new ExpectedResult()
+                            .success(false)
+                            .expectedType(TableAlreadyExistException.class)
+                            .expectedMessage(
+                                    String.format(
+                                            "Table (or view) %s already exists 
in Catalog %s.",
+                                            ObjectIdentifier.of(
+                                                            CURRENT_CATALOG,
+                                                            CURRENT_DATABASE,
+                                                            tableName)
+                                                    .toObjectPath()
+                                                    .getFullName(),
+                                            CURRENT_CATALOG))
+                });
+        return specs;
+    }
+
+    private static List<Object[]> prepareDropTableSpecs() {
+        List<Object[]> specs = new ArrayList<>();
+        // successful case specs
+        specs.add(
+                new Object[] {
+                    "table_" + UUID.randomUUID(),
+                    DROP_STATEMENT,
+                    true,
+                    true,
+                    new ExpectedResult().success(true)
+                });
+        specs.add(
+                new Object[] {
+                    "table_" + UUID.randomUUID(),
+                    DROP_STATEMENT,
+                    false,
+                    true,
+                    new ExpectedResult().success(true)
+                });
+        specs.add(
+                new Object[] {
+                    "table_" + UUID.randomUUID(),
+                    DROP_STATEMENT,
+                    true,
+                    false,
+                    new ExpectedResult().success(true)
+                });
+        specs.add(
+                new Object[] {
+                    "table_" + UUID.randomUUID(),
+                    DROP_STATEMENT,
+                    false,
+                    false,
+                    new ExpectedResult().success(true)
+                });
+
+        // failed case specs
+        specs.add(
+                new Object[] {
+                    "table_" + UUID.randomUUID(),
+                    DROP_STATEMENT,
+                    false,
+                    false,
+                    new ExpectedResult()
+                            .success(false)
+                            .expectedType(TableException.class)
+                            .expectedMessage("Failed to delete file store 
path.")
+                });
+        specs.add(
+                new Object[] {
+                    "table_" + UUID.randomUUID(),
+                    DROP_STATEMENT,
+                    true,
+                    false,
+                    new ExpectedResult()
+                            .success(false)
+                            .expectedType(TableException.class)
+                            .expectedMessage("Failed to delete kafka topic.")
+                });
+        final String tableName = "table_" + UUID.randomUUID();
+        specs.add(
+                new Object[] {
+                    tableName,
+                    DROP_STATEMENT,
+                    true,
+                    false,
+                    new ExpectedResult()
+                            .success(false)
+                            .expectedType(ValidationException.class)
+                            .expectedMessage(
+                                    String.format(
+                                            "Table with identifier '%s' does 
not exist.",
+                                            ObjectIdentifier.of(
+                                                            CURRENT_CATALOG,
+                                                            CURRENT_DATABASE,
+                                                            tableName)
+                                                    .asSummaryString()))
+                });
+        return specs;
+    }
+
+    private void prepareSessionContext() {
+        Configuration configuration = tEnv.getConfig().getConfiguration();
+        configuration.setString(TABLE_STORE_PREFIX + FILE_PATH.key(), 
rootPath);
+        configuration.setString(
+                TABLE_STORE_PREFIX + LOG_PREFIX + BOOTSTRAP_SERVERS.key(), 
getBootstrapServers());
+        configuration.setBoolean(TABLE_STORE_PREFIX + CHANGE_TRACKING.key(), 
enableChangeTracking);
+    }
+
+    private void prepareEnvForCreateTable() {
+        if (expectedResult.success) {
+            // ensure catalog doesn't contain the table meta
+            tEnv.getCatalog(tEnv.getCurrentCatalog())
+                    .ifPresent(
+                            (catalog) -> {
+                                try {
+                                    
catalog.dropTable(tableIdentifier.toObjectPath(), false);
+                                } catch (TableNotExistException ignored) {
+                                    // ignored
+                                }
+                            });
+            // ensure log store doesn't exist the topic
+            if (enableChangeTracking && !ignoreException) {
+                deleteTopicIfExists(tableIdentifier.asSummaryString());
+            }
+        } else if (expectedResult.expectedMessage.startsWith("Failed to create 
file store path.")) {
+            // failed when creating file store
+            Paths.get(rootPath, 
getRelativeFileStoreTablePath(tableIdentifier)).toFile().mkdirs();
+        } else if (expectedResult.expectedMessage.startsWith("Failed to create 
kafka topic.")) {
+            // failed when creating log store
+            createTopicIfNotExists(tableIdentifier.asSummaryString(), 
BUCKET.defaultValue());
+        } else {
+            // failed when registering schema to catalog
+            tEnv.getCatalog(tEnv.getCurrentCatalog())
+                    .ifPresent(
+                            (catalog) -> {
+                                try {
+                                    catalog.createTable(
+                                            tableIdentifier.toObjectPath(), 
resolvedTable, false);
+                                } catch (TableAlreadyExistException
+                                        | DatabaseNotExistException ignored) {
+                                    // ignored
+                                }
+                            });
+        }
+    }
+
+    private void prepareEnvForDropTable() {
+        ((TableEnvironmentImpl) tEnv)
+                .getCatalogManager()
+                .createTable(resolvedTable, tableIdentifier, false);
+        if (expectedResult.success) {
+            if (ignoreException) {
+                // delete catalog schema does not affect dropping the table
+                tEnv.getCatalog(tEnv.getCurrentCatalog())
+                        .ifPresent(
+                                (catalog) -> {
+                                    try {
+                                        
catalog.dropTable(tableIdentifier.toObjectPath(), false);
+                                    } catch (TableNotExistException ignored) {
+                                        // ignored
+                                    }
+                                });
+                // delete file store path does not affect dropping the table
+                deleteTablePath();
+                // delete log store topic does not affect dropping the table
+                if (enableChangeTracking) {
+                    deleteTopicIfExists(tableIdentifier.asSummaryString());
+                }
+            }
+        } else if (expectedResult.expectedMessage.startsWith("Failed to delete 
file store path.")) {
+            // failed when deleting file path
+            deleteTablePath();
+        } else if (expectedResult.expectedMessage.startsWith("Failed to delete 
kafka topic.")) {
+            // failed when deleting topic
+            deleteTopicIfExists(tableIdentifier.asSummaryString());
+        } else {
+            // failed when dropping catalog schema
+            tEnv.getCatalog(tEnv.getCurrentCatalog())
+                    .ifPresent(
+                            (catalog) -> {
+                                try {
+                                    
catalog.dropTable(tableIdentifier.toObjectPath(), false);
+                                } catch (TableNotExistException ignored) {
+                                    // ignored
+                                }
+                            });
+        }
+    }
+
+    private void deleteTablePath() {
+        try {
+            FileUtils.deleteDirectory(
+                    Paths.get(rootPath, 
getRelativeFileStoreTablePath(tableIdentifier)).toFile());
+        } catch (IOException ignored) {
+            // ignored
+        }
+    }
+
+    static ResolvedCatalogTable createResolvedTable(
+            Map<String, String> options, RowType rowType, int[] pk) {
+        List<String> fieldNames = rowType.getFieldNames();
+        List<DataType> fieldDataTypes =
+                rowType.getChildren().stream()
+                        .map(TypeConversions::fromLogicalToDataType)
+                        .collect(Collectors.toList());
+        CatalogTable origin =
+                CatalogTable.of(
+                        Schema.newBuilder().fromFields(fieldNames, 
fieldDataTypes).build(),
+                        "a comment",
+                        Collections.emptyList(),
+                        options);
+        List<Column> resolvedColumns =
+                IntStream.range(0, fieldNames.size())
+                        .mapToObj(i -> Column.physical(fieldNames.get(i), 
fieldDataTypes.get(i)))
+                        .collect(Collectors.toList());
+        UniqueConstraint constraint = null;
+        if (pk.length > 0) {
+            List<String> pkNames =
+                    
Arrays.stream(pk).mapToObj(fieldNames::get).collect(Collectors.toList());
+            constraint = UniqueConstraint.primaryKey("pk", pkNames);
+        }
+        return new ResolvedCatalogTable(
+                origin, new ResolvedSchema(resolvedColumns, 
Collections.emptyList(), constraint));
+    }
+
+    static String getRelativeFileStoreTablePath(ObjectIdentifier 
tableIdentifier) {
+        return String.format(
+                "root/%s.catalog/%s.db/%s",
+                tableIdentifier.getCatalogName(),
+                tableIdentifier.getDatabaseName(),
+                tableIdentifier.getObjectName());
+    }
+
+    enum StatementType {
+        CREATE_STATEMENT,
+        DROP_STATEMENT
+    }
+
+    private static class ExpectedResult {
+        private boolean success;
+        private Class<? extends Throwable> expectedType;
+        private String expectedMessage;
+
+        ExpectedResult success(boolean success) {
+            this.success = success;
+            return this;
+        }
+
+        ExpectedResult expectedType(Class<? extends Throwable> exceptionClazz) 
{
+            this.expectedType = exceptionClazz;
+            return this;
+        }
+
+        ExpectedResult expectedMessage(String exceptionMessage) {
+            this.expectedMessage = exceptionMessage;
+            return this;
+        }
+
+        @Override
+        public String toString() {
+            return "ExpectedResult{"
+                    + "success="
+                    + success
+                    + ", expectedType="
+                    + expectedType
+                    + ", expectedMessage='"
+                    + expectedMessage
+                    + '\''
+                    + '}';
+        }
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
index a54526f..0233e2d 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
@@ -27,12 +27,16 @@ import 
org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
 
 import java.io.Serializable;
 import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 
 /** Options for {@link FileStore}. */
 public class FileStoreOptions implements Serializable {
 
+    public static final String TABLE_STORE_PREFIX = "table-store.";
+
     public static final ConfigOption<Integer> BUCKET =
             ConfigOptions.key("bucket")
                     .intType()
@@ -93,6 +97,20 @@ public class FileStoreOptions implements Serializable {
 
     private final Configuration options;
 
+    public static Set<ConfigOption<?>> allOptions() {
+        Set<ConfigOption<?>> allOptions = new HashSet<>();
+        allOptions.add(BUCKET);
+        allOptions.add(FILE_PATH);
+        allOptions.add(FILE_FORMAT);
+        allOptions.add(MANIFEST_FORMAT);
+        allOptions.add(MANIFEST_TARGET_FILE_SIZE);
+        allOptions.add(MANIFEST_MERGE_MIN_COUNT);
+        allOptions.add(PARTITION_DEFAULT_NAME);
+        allOptions.add(SNAPSHOT_NUM_RETAINED);
+        allOptions.add(SNAPSHOT_TIME_RETAINED);
+        return allOptions;
+    }
+
     public FileStoreOptions(Configuration options) {
         this.options = options;
         // TODO validate all keys
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogOptions.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogOptions.java
index 3f36945..f975233 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogOptions.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogOptions.java
@@ -32,6 +32,8 @@ import static 
org.apache.flink.table.store.utils.OptionsUtils.formatEnumOption;
 /** Options for log store. */
 public class LogOptions {
 
+    public static final String LOG_PREFIX = "log.";
+
     public static final ConfigOption<LogStartupMode> SCAN =
             ConfigOptions.key("scan")
                     .enumType(LogStartupMode.class)
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
index 35c435d..2300fcf 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
@@ -46,10 +46,10 @@ import org.apache.flink.types.RowKind;
 public interface LogStoreTableFactory extends DynamicTableFactory {
 
     /** Notifies the listener that a table creation occurred. */
-    void onCreateTable(Context context, int numBucket);
+    void onCreateTable(Context context, int numBucket, boolean ignoreIfExists);
 
     /** Notifies the listener that a table drop occurred. */
-    void onDropTable(Context context);
+    void onDropTable(Context context, boolean ignoreIfNotExists);
 
     /**
      * Creates a {@link LogSourceProvider} instance from a {@link 
CatalogTable} and additional
diff --git a/flink-table-store-kafka/pom.xml b/flink-table-store-kafka/pom.xml
index 75e6811..676754e 100644
--- a/flink-table-store-kafka/pom.xml
+++ b/flink-table-store-kafka/pom.xml
@@ -157,4 +157,20 @@ under the License.
             <scope>test</scope>
         </dependency>
     </dependencies>
-</project>
\ No newline at end of file
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git 
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
 
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
index 9208940..943d0b6 100644
--- 
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
+++ 
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
@@ -35,6 +35,7 @@ import org.apache.flink.table.types.utils.DataTypeUtils;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 
 import java.util.Collections;
@@ -91,7 +92,7 @@ public class KafkaLogStoreFactory implements 
LogStoreTableFactory {
     }
 
     @Override
-    public void onCreateTable(DynamicTableFactory.Context context, int 
numBucket) {
+    public void onCreateTable(Context context, int numBucket, boolean 
ignoreIfExists) {
         FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, 
context);
         helper.validateExcept(KAFKA_PREFIX);
         try (AdminClient adminClient = 
AdminClient.create(toKafkaProperties(helper.getOptions()))) {
@@ -103,27 +104,54 @@ public class KafkaLogStoreFactory implements 
LogStoreTableFactory {
                                     configs.put(
                                             TopicConfig.RETENTION_MS_CONFIG,
                                             
String.valueOf(retention.toMillis())));
-
             NewTopic topicObj =
                     new NewTopic(topic(context), Optional.of(numBucket), 
Optional.empty())
                             .configs(configs);
             
adminClient.createTopics(Collections.singleton(topicObj)).all().get();
         } catch (ExecutionException | InterruptedException e) {
+            if (e.getCause() instanceof TopicExistsException) {
+                if (ignoreIfExists) {
+                    return;
+                }
+                throw new TableException(
+                        String.format(
+                                "Failed to create kafka topic. "
+                                        + "Reason: topic %s exists for table 
%s. "
+                                        + "Suggestion: please try `DESCRIBE 
TABLE %s` to "
+                                        + "check whether table exists in 
current catalog. "
+                                        + "If table exists and the DDL needs 
to be executed "
+                                        + "multiple times, please use `CREATE 
TABLE IF NOT EXISTS` ddl instead. "
+                                        + "Otherwise, please choose another 
table name "
+                                        + "or manually delete the current 
topic and try again.",
+                                topic(context),
+                                
context.getObjectIdentifier().asSerializableString(),
+                                
context.getObjectIdentifier().asSerializableString()));
+            }
             throw new TableException("Error in createTopic", e);
         }
     }
 
     @Override
-    public void onDropTable(DynamicTableFactory.Context context) {
+    public void onDropTable(Context context, boolean ignoreIfNotExists) {
         try (AdminClient adminClient =
                 AdminClient.create(
                         toKafkaProperties(createTableFactoryHelper(this, 
context).getOptions()))) {
             
adminClient.deleteTopics(Collections.singleton(topic(context))).all().get();
         } catch (ExecutionException e) {
-            // ignore topic not exists
-            if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
-                throw new TableException("Error in deleteTopic", e);
+            // check the cause to ignore topic not exists conditionally
+            if (e.getCause() instanceof UnknownTopicOrPartitionException) {
+                if (ignoreIfNotExists) {
+                    return;
+                }
+                throw new TableException(
+                        String.format(
+                                "Failed to delete kafka topic. "
+                                        + "Reason: topic %s doesn't exist for 
table %s. "
+                                        + "Suggestion: please try `DROP TABLE 
IF EXISTS` ddl instead.",
+                                topic(context),
+                                
context.getObjectIdentifier().asSerializableString()));
             }
+            throw new TableException("Error in deleteTopic", e);
         } catch (InterruptedException e) {
             throw new TableException("Error in deleteTopic", e);
         }
diff --git 
a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
 
b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
index 8b9dccf..bb18a11 100644
--- 
a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
+++ 
b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
@@ -50,7 +50,7 @@ public class KafkaLogITCase extends KafkaTableTestBase {
     @Test
     public void testDropEmpty() {
         // Expect no exceptions to be thrown
-        factory.onDropTable(testContext(getBootstrapServers(), 
LogChangelogMode.AUTO, true));
+        factory.onDropTable(testContext(getBootstrapServers(), 
LogChangelogMode.AUTO, true), true);
     }
 
     @Test
@@ -141,7 +141,7 @@ public class KafkaLogITCase extends KafkaTableTestBase {
         KafkaLogSourceProvider sourceProvider =
                 factory.createSourceProvider(context, SOURCE_CONTEXT);
 
-        factory.onCreateTable(context, 3);
+        factory.onCreateTable(context, 3, true);
         try {
             // transactional need to commit
             enableCheckpoint();
@@ -198,7 +198,7 @@ public class KafkaLogITCase extends KafkaTableTestBase {
             assertRow(records.get(1), RowKind.INSERT, 11, 12);
             assertRow(records.get(2), RowKind.INSERT, 13, 14);
         } finally {
-            factory.onDropTable(context);
+            factory.onDropTable(context, true);
         }
     }
 
diff --git 
a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java
 
b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java
index dbe7897..db92832 100644
--- 
a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java
+++ 
b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java
@@ -25,11 +25,14 @@ import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.util.DockerImageVersions;
 
 import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.admin.TopicListing;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.junit.After;
 import org.junit.Before;
@@ -43,8 +46,10 @@ import org.testcontainers.utility.DockerImageName;
 
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -126,6 +131,44 @@ public abstract class KafkaTableTestBase extends 
AbstractTestBase {
         return KAFKA_CONTAINER.getBootstrapServers();
     }
 
+    protected boolean topicExists(String topicName) {
+        return describeExternalTopics().containsKey(topicName);
+    }
+
+    protected void createTopicIfNotExists(String topicName, int numBucket) {
+        try (final AdminClient adminClient = 
AdminClient.create(getStandardProps())) {
+            if (!adminClient.listTopics().names().get().contains(topicName)) {
+                adminClient
+                        .createTopics(
+                                Collections.singleton(
+                                        new NewTopic(
+                                                topicName,
+                                                Optional.of(numBucket),
+                                                Optional.empty())))
+                        .all()
+                        .get();
+            }
+        } catch (Exception e) {
+            if (!(e.getCause() instanceof TopicExistsException)) {
+                throw new RuntimeException(
+                        String.format("Failed to create Kafka topic %s", 
topicName), e);
+            }
+        }
+    }
+
+    protected void deleteTopicIfExists(String topicName) {
+        try (final AdminClient adminClient = 
AdminClient.create(getStandardProps())) {
+            if (adminClient.listTopics().names().get().contains(topicName)) {
+                
adminClient.deleteTopics(Collections.singleton(topicName)).all().get();
+            }
+        } catch (Exception e) {
+            if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
+                throw new RuntimeException(
+                        String.format("Failed to drop Kafka topic %s", 
topicName), e);
+            }
+        }
+    }
+
     // ------------------------ For Debug Logging Purpose 
----------------------------------
 
     private void scheduleTimeoutLogger(Duration period, Runnable 
loggingAction) {

Reply via email to