This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 14c26f9  [FLINK-27752] Change 'path' to 'root-path' in table store
14c26f9 is described below

commit 14c26f9f0fa4a9fa4094c338729036fb4166ed66
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed May 25 14:35:48 2022 +0800

    [FLINK-27752] Change 'path' to 'root-path' in table store
    
    This closes #136
---
 docs/content/docs/development/create-table.md      |   4 +-
 docs/content/docs/try-table-store/quick-start.md   |   2 +-
 .../store/connector/AbstractTableStoreFactory.java | 196 +++++++++++++
 .../flink/table/store/connector/TableStore.java    |   8 +-
 .../table/store/connector/TableStoreFactory.java   | 311 ---------------------
 .../store/connector/TableStoreFactoryOptions.java  |   8 +-
 .../store/connector/TableStoreManagedFactory.java  | 188 +++++++++++++
 .../org.apache.flink.table.factories.Factory       |   2 +-
 .../table/store/connector/FileStoreITCase.java     |   2 +-
 .../store/connector/FileStoreTableITCase.java      |   4 +-
 .../store/connector/ReadWriteTableITCase.java      |  20 +-
 .../store/connector/ReadWriteTableTestBase.java    |   3 +-
 ...Test.java => TableStoreManagedFactoryTest.java} | 129 ++++++---
 .../table/store/connector/TableStoreTestBase.java  |   4 +-
 .../flink/table/store/file/FileStoreOptions.java   |  23 +-
 .../table/store/log/LogStoreTableFactory.java      |   5 +
 .../flink/table/store/file/TestFileStore.java      |   3 +-
 .../table/store/tests/FileStoreBatchE2eTest.java   |   2 +-
 .../store/tests/FileStoreFlinkFormatE2eTest.java   |   2 +-
 .../table/store/tests/FileStoreStreamE2eTest.java  |   2 +-
 .../flink/table/store/tests/LogStoreE2eTest.java   |   4 +-
 .../flink/table/store/tests/TypeE2eTest.java       |   4 +-
 .../flink/table/store/FileStoreTestHelper.java     |   4 +-
 .../hive/TableStoreHiveStorageHandlerITCase.java   |  42 +--
 .../store/mapred/TableStoreRecordReaderTest.java   |   3 -
 .../flink/table/store/kafka/KafkaLogOptions.java   |   6 +
 .../table/store/kafka/KafkaLogStoreFactory.java    |  24 +-
 .../flink/table/store/kafka/KafkaLogTestUtils.java |   3 +
 28 files changed, 578 insertions(+), 430 deletions(-)

diff --git a/docs/content/docs/development/create-table.md 
b/docs/content/docs/development/create-table.md
index 95f5e87..21efdf4 100644
--- a/docs/content/docs/development/create-table.md
+++ b/docs/content/docs/development/create-table.md
@@ -106,7 +106,7 @@ Important options include the following:
     </thead>
     <tbody>
     <tr>
-      <td><h5>path</h5></td>
+      <td><h5>root-path</h5></td>
       <td>Yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
@@ -147,7 +147,7 @@ Important options include the following:
 
 Creating a table will create the corresponding physical storage:
 - The table's FileStore directory will be created under:
-  `${path}/${catalog_name}.catalog/${database_name}.db/${table_name}`
+  `${root-path}/${catalog_name}.catalog/${database_name}.db/${table_name}`
 - If `log.system` is configured as Kafka, a Topic named
   "${catalog_name}.${database_name}.${table_name}" will be created
   automatically when the table is created.
diff --git a/docs/content/docs/try-table-store/quick-start.md 
b/docs/content/docs/try-table-store/quick-start.md
index 3c00f5b..78a4eea 100644
--- a/docs/content/docs/try-table-store/quick-start.md
+++ b/docs/content/docs/try-table-store/quick-start.md
@@ -112,7 +112,7 @@ Start the SQL Client CLI:
 
 ```sql
 -- set root path to session config
-SET 'table-store.path' = '/tmp/table_store';
+SET 'table-store.root-path' = '/tmp/table_store';
 
 -- create a word count dynamic table without 'connector' option
 CREATE TABLE word_count (
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
new file mode 100644
index 0000000..4b842a5
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.store.connector.sink.TableStoreSink;
+import org.apache.flink.table.store.connector.source.TableStoreSource;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
+import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.log.LogOptions;
+import org.apache.flink.table.store.log.LogStoreTableFactory;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.store.connector.TableStoreFactoryOptions.LOG_SYSTEM;
+import static org.apache.flink.table.store.log.LogOptions.CHANGELOG_MODE;
+import static org.apache.flink.table.store.log.LogOptions.CONSISTENCY;
+import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
+import static org.apache.flink.table.store.log.LogOptions.SCAN;
+import static 
org.apache.flink.table.store.log.LogStoreTableFactory.discoverLogStoreFactory;
+
+/** Abstract table store factory to create table source and table sink. */
+public abstract class AbstractTableStoreFactory
+        implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+    @Override
+    public TableStoreSource createDynamicTableSource(Context context) {
+        return new TableStoreSource(
+                buildTableStore(context),
+                context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
+                        == RuntimeExecutionMode.STREAMING,
+                createLogContext(context),
+                createOptionalLogStoreFactory(context).orElse(null));
+    }
+
+    @Override
+    public TableStoreSink createDynamicTableSink(Context context) {
+        return new TableStoreSink(
+                buildTableStore(context),
+                createLogContext(context),
+                createOptionalLogStoreFactory(context).orElse(null));
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        Set<ConfigOption<?>> options = FileStoreOptions.allOptions();
+        options.addAll(MergeTreeOptions.allOptions());
+        options.addAll(TableStoreFactoryOptions.allOptions());
+        return options;
+    }
+
+    // ~ Tools 
------------------------------------------------------------------
+
+    static Optional<LogStoreTableFactory> createOptionalLogStoreFactory(
+            DynamicTableFactory.Context context) {
+        return createOptionalLogStoreFactory(
+                context.getClassLoader(), 
context.getCatalogTable().getOptions());
+    }
+
+    static Optional<LogStoreTableFactory> createOptionalLogStoreFactory(
+            ClassLoader classLoader, Map<String, String> options) {
+        Configuration configOptions = new Configuration();
+        options.forEach(configOptions::setString);
+
+        if (configOptions.get(LOG_SYSTEM) == null) {
+            // Use file store continuous reading
+            validateFileStoreContinuous(configOptions);
+            return Optional.empty();
+        }
+
+        return Optional.of(discoverLogStoreFactory(classLoader, 
configOptions.get(LOG_SYSTEM)));
+    }
+
+    private static void validateFileStoreContinuous(Configuration options) {
+        Configuration logOptions = new DelegatingConfiguration(options, 
LOG_PREFIX);
+        LogOptions.LogChangelogMode changelogMode = 
logOptions.get(CHANGELOG_MODE);
+        if (changelogMode == LogOptions.LogChangelogMode.UPSERT) {
+            throw new ValidationException(
+                    "File store continuous reading dose not support upsert 
changelog mode.");
+        }
+        LogOptions.LogConsistency consistency = logOptions.get(CONSISTENCY);
+        if (consistency == LogOptions.LogConsistency.EVENTUAL) {
+            throw new ValidationException(
+                    "File store continuous reading dose not support eventual 
consistency mode.");
+        }
+        LogOptions.LogStartupMode startupMode = logOptions.get(SCAN);
+        if (startupMode == LogOptions.LogStartupMode.FROM_TIMESTAMP) {
+            throw new ValidationException(
+                    "File store continuous reading dose not support 
from_timestamp scan mode, "
+                            + "you can add timestamp filters instead.");
+        }
+    }
+
+    static DynamicTableFactory.Context 
createLogContext(DynamicTableFactory.Context context) {
+        return createLogContext(context, 
context.getCatalogTable().getOptions());
+    }
+
+    static DynamicTableFactory.Context createLogContext(
+            DynamicTableFactory.Context context, Map<String, String> options) {
+        return new FactoryUtil.DefaultDynamicTableContext(
+                context.getObjectIdentifier(),
+                context.getCatalogTable().copy(filterLogStoreOptions(options)),
+                filterLogStoreOptions(context.getEnrichmentOptions()),
+                context.getConfiguration(),
+                context.getClassLoader(),
+                context.isTemporary());
+    }
+
+    static Map<String, String> filterLogStoreOptions(Map<String, String> 
options) {
+        return options.entrySet().stream()
+                .filter(entry -> !entry.getKey().equals(LOG_SYSTEM.key())) // 
exclude log.system
+                .filter(entry -> entry.getKey().startsWith(LOG_PREFIX))
+                .collect(
+                        Collectors.toMap(
+                                entry -> 
entry.getKey().substring(LOG_PREFIX.length()),
+                                Map.Entry::getValue));
+    }
+
+    static TableStore buildTableStore(DynamicTableFactory.Context context) {
+        TableStore store =
+                new TableStore(
+                        context.getObjectIdentifier(),
+                        
Configuration.fromMap(context.getCatalogTable().getOptions()));
+
+        Schema schema = store.schema();
+
+        UpdateSchema updateSchema = 
UpdateSchema.fromCatalogTable(context.getCatalogTable());
+
+        RowType rowType = updateSchema.rowType();
+        List<String> partitionKeys = updateSchema.partitionKeys();
+        List<String> primaryKeys = updateSchema.primaryKeys();
+
+        // compare fields to ignore isNullable for row type
+        Preconditions.checkArgument(
+                
schema.logicalRowType().getFields().equals(rowType.getFields()),
+                "Flink schema and store schema are not the same, "
+                        + "store schema is %s, Flink schema is %s",
+                schema.logicalRowType(),
+                rowType);
+
+        Preconditions.checkArgument(
+                schema.partitionKeys().equals(partitionKeys),
+                "Flink partitionKeys and store partitionKeys are not the same, 
"
+                        + "store partitionKeys is %s, Flink partitionKeys is 
%s",
+                schema.partitionKeys(),
+                partitionKeys);
+
+        Preconditions.checkArgument(
+                schema.primaryKeys().equals(primaryKeys),
+                "Flink primaryKeys and store primaryKeys are not the same, "
+                        + "store primaryKeys is %s, Flink primaryKeys is %s",
+                schema.primaryKeys(),
+                primaryKeys);
+
+        return store;
+    }
+}
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
index 35fd023..3060323 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
@@ -92,7 +92,7 @@ public class TableStore {
         this.tableIdentifier = tableIdentifier;
         this.options = options;
 
-        Path tablePath = new FileStoreOptions(options).path(tableIdentifier);
+        Path tablePath = FileStoreOptions.path(options);
         this.schema =
                 new SchemaManager(tablePath)
                         .latest()
@@ -180,7 +180,7 @@ public class TableStore {
         FileStoreOptions fileStoreOptions = new FileStoreOptions(options);
 
         return FileStoreImpl.createWithAppendOnly(
-                fileStoreOptions.path(tableIdentifier).toString(),
+                fileStoreOptions.path().toString(),
                 schema.id(),
                 fileStoreOptions,
                 user,
@@ -195,7 +195,7 @@ public class TableStore {
 
         if (trimmedPrimaryKeys.length == 0) {
             return FileStoreImpl.createWithValueCount(
-                    fileStoreOptions.path(tableIdentifier).toString(),
+                    fileStoreOptions.path().toString(),
                     schema.id(),
                     fileStoreOptions,
                     user,
@@ -203,7 +203,7 @@ public class TableStore {
                     type);
         } else {
             return FileStoreImpl.createWithPrimaryKey(
-                    fileStoreOptions.path(tableIdentifier).toString(),
+                    fileStoreOptions.path().toString(),
                     schema.id(),
                     fileStoreOptions,
                     user,
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
deleted file mode 100644
index a61e33e..0000000
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
+++ /dev/null
@@ -1,311 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.DelegatingConfiguration;
-import org.apache.flink.configuration.ExecutionOptions;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.catalog.CatalogPartitionSpec;
-import org.apache.flink.table.factories.DynamicTableSinkFactory;
-import org.apache.flink.table.factories.DynamicTableSourceFactory;
-import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.flink.table.factories.ManagedTableFactory;
-import org.apache.flink.table.store.connector.sink.TableStoreSink;
-import org.apache.flink.table.store.connector.source.TableStoreSource;
-import org.apache.flink.table.store.connector.utils.TableConfigUtils;
-import org.apache.flink.table.store.file.FileStoreOptions;
-import org.apache.flink.table.store.file.WriteMode;
-import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
-import org.apache.flink.table.store.file.schema.Schema;
-import org.apache.flink.table.store.file.schema.SchemaManager;
-import org.apache.flink.table.store.file.schema.UpdateSchema;
-import org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
-import org.apache.flink.table.store.log.LogOptions.LogConsistency;
-import org.apache.flink.table.store.log.LogOptions.LogStartupMode;
-import org.apache.flink.table.store.log.LogStoreTableFactory;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import static 
org.apache.flink.table.store.connector.TableStoreFactoryOptions.LOG_SYSTEM;
-import static 
org.apache.flink.table.store.connector.TableStoreFactoryOptions.WRITE_MODE;
-import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
-import static 
org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
-import static org.apache.flink.table.store.log.LogOptions.CHANGELOG_MODE;
-import static org.apache.flink.table.store.log.LogOptions.CONSISTENCY;
-import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
-import static org.apache.flink.table.store.log.LogOptions.SCAN;
-import static 
org.apache.flink.table.store.log.LogStoreTableFactory.discoverLogStoreFactory;
-
-/** Default implementation of {@link ManagedTableFactory}. */
-public class TableStoreFactory
-        implements ManagedTableFactory, DynamicTableSourceFactory, 
DynamicTableSinkFactory {
-
-    @Override
-    public Map<String, String> enrichOptions(Context context) {
-        Map<String, String> enrichedOptions = new 
HashMap<>(context.getCatalogTable().getOptions());
-        TableConfigUtils.extractConfiguration(context.getConfiguration())
-                .toMap()
-                .forEach(
-                        (k, v) -> {
-                            if (k.startsWith(TABLE_STORE_PREFIX)) {
-                                enrichedOptions.putIfAbsent(
-                                        
k.substring(TABLE_STORE_PREFIX.length()), v);
-                            }
-                        });
-        return enrichedOptions;
-    }
-
-    @Override
-    public void onCreateTable(Context context, boolean ignoreIfExists) {
-        Map<String, String> options = context.getCatalogTable().getOptions();
-        Path path = FileStoreOptions.path(options, 
context.getObjectIdentifier());
-        try {
-            if (path.getFileSystem().exists(path) && !ignoreIfExists) {
-                throw new TableException(
-                        String.format(
-                                "Failed to create file store path. "
-                                        + "Reason: directory %s exists for 
table %s. "
-                                        + "Suggestion: please try `DESCRIBE 
TABLE %s` to "
-                                        + "first check whether table exists in 
current catalog. "
-                                        + "If table exists in catalog, and 
data files under current path "
-                                        + "are valid, please use `CREATE TABLE 
IF NOT EXISTS` ddl instead. "
-                                        + "Otherwise, please choose another 
table name "
-                                        + "or manually delete the current path 
and try again.",
-                                path,
-                                
context.getObjectIdentifier().asSerializableString(),
-                                
context.getObjectIdentifier().asSerializableString()));
-            }
-            path.getFileSystem().mkdirs(path);
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
-
-        // Cannot define any primary key in an append-only table.
-        if 
(context.getCatalogTable().getResolvedSchema().getPrimaryKey().isPresent()) {
-            if (Objects.equals(
-                    WriteMode.APPEND_ONLY.toString(),
-                    options.getOrDefault(WRITE_MODE.key(), 
WRITE_MODE.defaultValue().toString()))) {
-                throw new TableException(
-                        "Cannot define any primary key in an append-only 
table. Set 'write-mode'='change-log' if "
-                                + "still want to keep the primary key 
definition.");
-            }
-        }
-
-        // update schema
-        Path tablePath =
-                new FileStoreOptions(context.getCatalogTable().getOptions())
-                        .path(context.getObjectIdentifier());
-        // TODO pass lock
-        try {
-            new SchemaManager(tablePath)
-                    
.commitNewVersion(UpdateSchema.fromCatalogTable(context.getCatalogTable()));
-        } catch (IllegalStateException e) {
-            throw e;
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-
-        createOptionalLogStoreFactory(context)
-                .ifPresent(
-                        factory ->
-                                factory.onCreateTable(
-                                        createLogContext(context),
-                                        Integer.parseInt(
-                                                options.getOrDefault(
-                                                        BUCKET.key(),
-                                                        
BUCKET.defaultValue().toString())),
-                                        ignoreIfExists));
-    }
-
-    @Override
-    public void onDropTable(Context context, boolean ignoreIfNotExists) {
-        Map<String, String> options = context.getCatalogTable().getOptions();
-        Path path = FileStoreOptions.path(options, 
context.getObjectIdentifier());
-        try {
-            if (path.getFileSystem().exists(path)) {
-                path.getFileSystem().delete(path, true);
-            } else if (!ignoreIfNotExists) {
-                throw new TableException(
-                        String.format(
-                                "Failed to delete file store path. "
-                                        + "Reason: directory %s doesn't exist 
for table %s. "
-                                        + "Suggestion: please try `DROP TABLE 
IF EXISTS` ddl instead.",
-                                path, 
context.getObjectIdentifier().asSerializableString()));
-            }
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
-        createOptionalLogStoreFactory(context)
-                .ifPresent(
-                        factory ->
-                                factory.onDropTable(createLogContext(context), 
ignoreIfNotExists));
-    }
-
-    @Override
-    public Map<String, String> onCompactTable(
-            Context context, CatalogPartitionSpec catalogPartitionSpec) {
-        throw new UnsupportedOperationException("Not implement yet");
-    }
-
-    @Override
-    public TableStoreSource createDynamicTableSource(Context context) {
-        return new TableStoreSource(
-                buildTableStore(context),
-                context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
-                        == RuntimeExecutionMode.STREAMING,
-                createLogContext(context),
-                createOptionalLogStoreFactory(context).orElse(null));
-    }
-
-    @Override
-    public TableStoreSink createDynamicTableSink(Context context) {
-        return new TableStoreSink(
-                buildTableStore(context),
-                createLogContext(context),
-                createOptionalLogStoreFactory(context).orElse(null));
-    }
-
-    @Override
-    public Set<ConfigOption<?>> requiredOptions() {
-        return Collections.emptySet();
-    }
-
-    @Override
-    public Set<ConfigOption<?>> optionalOptions() {
-        Set<ConfigOption<?>> options = FileStoreOptions.allOptions();
-        options.addAll(MergeTreeOptions.allOptions());
-        options.addAll(TableStoreFactoryOptions.allOptions());
-        return options;
-    }
-
-    // ~ Tools 
------------------------------------------------------------------
-
-    private static Optional<LogStoreTableFactory> 
createOptionalLogStoreFactory(Context context) {
-        Configuration options = new Configuration();
-        context.getCatalogTable().getOptions().forEach(options::setString);
-
-        if (options.get(LOG_SYSTEM) == null) {
-            // Use file store continuous reading
-            validateFileStoreContinuous(options);
-            return Optional.empty();
-        }
-
-        return Optional.of(
-                discoverLogStoreFactory(context.getClassLoader(), 
options.get(LOG_SYSTEM)));
-    }
-
-    private static void validateFileStoreContinuous(Configuration options) {
-        Configuration logOptions = new DelegatingConfiguration(options, 
LOG_PREFIX);
-        LogChangelogMode changelogMode = logOptions.get(CHANGELOG_MODE);
-        if (changelogMode == LogChangelogMode.UPSERT) {
-            throw new ValidationException(
-                    "File store continuous reading dose not support upsert 
changelog mode.");
-        }
-        LogConsistency consistency = logOptions.get(CONSISTENCY);
-        if (consistency == LogConsistency.EVENTUAL) {
-            throw new ValidationException(
-                    "File store continuous reading dose not support eventual 
consistency mode.");
-        }
-        LogStartupMode startupMode = logOptions.get(SCAN);
-        if (startupMode == LogStartupMode.FROM_TIMESTAMP) {
-            throw new ValidationException(
-                    "File store continuous reading dose not support 
from_timestamp scan mode, "
-                            + "you can add timestamp filters instead.");
-        }
-    }
-
-    private static Context createLogContext(Context context) {
-        return new FactoryUtil.DefaultDynamicTableContext(
-                context.getObjectIdentifier(),
-                context.getCatalogTable()
-                        
.copy(filterLogStoreOptions(context.getCatalogTable().getOptions())),
-                filterLogStoreOptions(context.getEnrichmentOptions()),
-                context.getConfiguration(),
-                context.getClassLoader(),
-                context.isTemporary());
-    }
-
-    @VisibleForTesting
-    static Map<String, String> filterLogStoreOptions(Map<String, String> 
options) {
-        return options.entrySet().stream()
-                .filter(entry -> !entry.getKey().equals(LOG_SYSTEM.key())) // 
exclude log.system
-                .filter(entry -> entry.getKey().startsWith(LOG_PREFIX))
-                .collect(
-                        Collectors.toMap(
-                                entry -> 
entry.getKey().substring(LOG_PREFIX.length()),
-                                Map.Entry::getValue));
-    }
-
-    @VisibleForTesting
-    TableStore buildTableStore(Context context) {
-        TableStore store =
-                new TableStore(
-                        context.getObjectIdentifier(),
-                        
Configuration.fromMap(context.getCatalogTable().getOptions()));
-
-        Schema schema = store.schema();
-
-        UpdateSchema updateSchema = 
UpdateSchema.fromCatalogTable(context.getCatalogTable());
-
-        RowType rowType = updateSchema.rowType();
-        List<String> partitionKeys = updateSchema.partitionKeys();
-        List<String> primaryKeys = updateSchema.primaryKeys();
-
-        // compare fields to ignore isNullable for row type
-        Preconditions.checkArgument(
-                
schema.logicalRowType().getFields().equals(rowType.getFields()),
-                "Flink schema and store schema are not the same, "
-                        + "store schema is %s, Flink schema is %s",
-                schema.logicalRowType(),
-                rowType);
-
-        Preconditions.checkArgument(
-                schema.partitionKeys().equals(partitionKeys),
-                "Flink partitionKeys and store partitionKeys are not the same, 
"
-                        + "store partitionKeys is %s, Flink partitionKeys is 
%s",
-                schema.partitionKeys(),
-                partitionKeys);
-
-        Preconditions.checkArgument(
-                schema.primaryKeys().equals(primaryKeys),
-                "Flink primaryKeys and store primaryKeys are not the same, "
-                        + "store primaryKeys is %s, Flink primaryKeys is %s",
-                schema.primaryKeys(),
-                primaryKeys);
-
-        return store;
-    }
-}
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
index 829f99c..2f429cc 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
@@ -30,9 +30,15 @@ import java.util.Set;
 
 import static org.apache.flink.table.store.utils.OptionsUtils.formatEnumOption;
 
-/** Options for {@link TableStoreFactory}. */
+/** Options for {@link TableStoreManagedFactory}. */
 public class TableStoreFactoryOptions {
 
+    public static final ConfigOption<String> ROOT_PATH =
+            ConfigOptions.key("root-path")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The root file path of the table store in 
the filesystem.");
+
     public static final ConfigOption<Boolean> COMPACTION_RESCALE_BUCKET =
             ConfigOptions.key("compaction.rescale-bucket")
                     .booleanType()
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
new file mode 100644
index 0000000..6054753
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.factories.ManagedTableFactory;
+import org.apache.flink.table.store.connector.utils.TableConfigUtils;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.log.LogStoreTableFactory;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
+import static 
org.apache.flink.table.store.connector.TableStoreFactoryOptions.WRITE_MODE;
+import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
+import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
+import static 
org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
+import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
+
+/** Default implementation of {@link ManagedTableFactory}. */
+public class TableStoreManagedFactory extends AbstractTableStoreFactory
+        implements ManagedTableFactory {
+
+    @Override
+    public Map<String, String> enrichOptions(Context context) {
+        Map<String, String> enrichedOptions = new 
HashMap<>(context.getCatalogTable().getOptions());
+        TableConfigUtils.extractConfiguration(context.getConfiguration())
+                .toMap()
+                .forEach(
+                        (k, v) -> {
+                            if (k.startsWith(TABLE_STORE_PREFIX)) {
+                                enrichedOptions.putIfAbsent(
+                                        
k.substring(TABLE_STORE_PREFIX.length()), v);
+                            }
+                        });
+
+        String rootPath = enrichedOptions.remove(ROOT_PATH.key());
+        Preconditions.checkArgument(
+                rootPath != null,
+                String.format(
+                        "Please specify a root path by setting session level 
configuration "
+                                + "as `SET 'table-store.%s' = '...'`.",
+                        ROOT_PATH.key()));
+
+        Preconditions.checkArgument(
+                !enrichedOptions.containsKey(PATH.key()),
+                "Managed table can not contain table path. "
+                        + "You need to remove path in table options or session 
config.");
+
+        Path path = new Path(rootPath, 
relativeTablePath(context.getObjectIdentifier()));
+        enrichedOptions.put(PATH.key(), path.toString());
+
+        Optional<LogStoreTableFactory> logFactory =
+                createOptionalLogStoreFactory(context.getClassLoader(), 
enrichedOptions);
+        logFactory.ifPresent(
+                factory ->
+                        factory.enrichOptions(createLogContext(context, 
enrichedOptions))
+                                .forEach((k, v) -> 
enrichedOptions.putIfAbsent(LOG_PREFIX + k, v)));
+
+        return enrichedOptions;
+    }
+
+    @VisibleForTesting
+    static String relativeTablePath(ObjectIdentifier tableIdentifier) {
+        return String.format(
+                "%s.catalog/%s.db/%s",
+                tableIdentifier.getCatalogName(),
+                tableIdentifier.getDatabaseName(),
+                tableIdentifier.getObjectName());
+    }
+
+    @Override
+    public void onCreateTable(Context context, boolean ignoreIfExists) {
+        Map<String, String> options = context.getCatalogTable().getOptions();
+        Path path = FileStoreOptions.path(options);
+        try {
+            if (path.getFileSystem().exists(path) && !ignoreIfExists) {
+                throw new TableException(
+                        String.format(
+                                "Failed to create file store path. "
+                                        + "Reason: directory %s exists for 
table %s. "
+                                        + "Suggestion: please try `DESCRIBE 
TABLE %s` to "
+                                        + "first check whether table exists in 
current catalog. "
+                                        + "If table exists in catalog, and 
data files under current path "
+                                        + "are valid, please use `CREATE TABLE 
IF NOT EXISTS` ddl instead. "
+                                        + "Otherwise, please choose another 
table name "
+                                        + "or manually delete the current path 
and try again.",
+                                path,
+                                
context.getObjectIdentifier().asSerializableString(),
+                                
context.getObjectIdentifier().asSerializableString()));
+            }
+            path.getFileSystem().mkdirs(path);
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+
+        // Cannot define any primary key in an append-only table.
+        if 
(context.getCatalogTable().getResolvedSchema().getPrimaryKey().isPresent()) {
+            if (Objects.equals(
+                    WriteMode.APPEND_ONLY.toString(),
+                    options.getOrDefault(WRITE_MODE.key(), 
WRITE_MODE.defaultValue().toString()))) {
+                throw new TableException(
+                        "Cannot define any primary key in an append-only 
table. Set 'write-mode'='change-log' if "
+                                + "still want to keep the primary key 
definition.");
+            }
+        }
+
+        // update schema
+        // TODO pass lock
+        try {
+            new SchemaManager(path)
+                    
.commitNewVersion(UpdateSchema.fromCatalogTable(context.getCatalogTable()));
+        } catch (IllegalStateException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+        createOptionalLogStoreFactory(context)
+                .ifPresent(
+                        factory ->
+                                factory.onCreateTable(
+                                        createLogContext(context),
+                                        Integer.parseInt(
+                                                options.getOrDefault(
+                                                        BUCKET.key(),
+                                                        
BUCKET.defaultValue().toString())),
+                                        ignoreIfExists));
+    }
+
+    @Override
+    public void onDropTable(Context context, boolean ignoreIfNotExists) {
+        Path path = 
FileStoreOptions.path(context.getCatalogTable().getOptions());
+        try {
+            if (path.getFileSystem().exists(path)) {
+                path.getFileSystem().delete(path, true);
+            } else if (!ignoreIfNotExists) {
+                throw new TableException(
+                        String.format(
+                                "Failed to delete file store path. "
+                                        + "Reason: directory %s doesn't exist 
for table %s. "
+                                        + "Suggestion: please try `DROP TABLE 
IF EXISTS` ddl instead.",
+                                path, 
context.getObjectIdentifier().asSerializableString()));
+            }
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+        createOptionalLogStoreFactory(context)
+                .ifPresent(
+                        factory ->
+                                factory.onDropTable(createLogContext(context), 
ignoreIfNotExists));
+    }
+
+    @Override
+    public Map<String, String> onCompactTable(
+            Context context, CatalogPartitionSpec catalogPartitionSpec) {
+        throw new UnsupportedOperationException("Not implement yet");
+    }
+}
diff --git 
a/flink-table-store-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/flink-table-store-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index e377345..726415a 100644
--- 
a/flink-table-store-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ 
b/flink-table-store-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.table.store.connector.TableStoreFactory
+org.apache.flink.table.store.connector.TableStoreManagedFactory
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
index e41f3dc..eb08708 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
@@ -350,7 +350,7 @@ public class FileStoreITCase extends AbstractTestBase {
             throws Exception {
         ObjectIdentifier identifier = ObjectIdentifier.of("catalog", "db", 
"t");
         Configuration options = buildConfiguration(noFail, 
temporaryFolder.newFolder());
-        Path tablePath = new FileStoreOptions(options).path(identifier);
+        Path tablePath = new FileStoreOptions(options).path();
         new SchemaManager(tablePath)
                 .commitNewVersion(
                         new UpdateSchema(
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
index d10d2a7..59f3d96 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
@@ -36,7 +36,7 @@ import java.time.Duration;
 import java.util.List;
 
 import static 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
-import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
+import static 
org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
 import static 
org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
 
 /** ITCase for file store table api. */
@@ -58,7 +58,7 @@ public abstract class FileStoreTableITCase extends 
AbstractTestBase {
     private void prepareEnv(TableEnvironment env, String path) {
         Configuration config = env.getConfig().getConfiguration();
         
config.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
-        config.setString(TABLE_STORE_PREFIX + PATH.key(), path);
+        config.setString(TABLE_STORE_PREFIX + ROOT_PATH.key(), path);
         ddl().forEach(env::executeSql);
     }
 
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
index d120223..5c0b913 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
@@ -35,7 +35,6 @@ import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.factories.FactoryUtil;
 import 
org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
 import org.apache.flink.table.store.connector.sink.TableStoreSink;
-import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.utils.BlockingIterator;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
@@ -46,6 +45,7 @@ import org.junit.Test;
 
 import javax.annotation.Nullable;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
@@ -62,6 +62,7 @@ import static 
org.apache.flink.table.store.connector.ReadWriteTableTestUtil.dail
 import static 
org.apache.flink.table.store.connector.ReadWriteTableTestUtil.dailyRatesChangelogWithoutUB;
 import static 
org.apache.flink.table.store.connector.ReadWriteTableTestUtil.rates;
 import static 
org.apache.flink.table.store.connector.ShowCreateUtil.buildSimpleSelectQuery;
+import static 
org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
 import static 
org.apache.flink.table.store.connector.TableStoreFactoryOptions.SCAN_PARALLELISM;
 import static 
org.apache.flink.table.store.connector.TableStoreFactoryOptions.SINK_PARALLELISM;
 import static 
org.apache.flink.table.store.connector.TableStoreTestBase.createResolvedTable;
@@ -1218,7 +1219,7 @@ public class ReadWriteTableITCase extends 
ReadWriteTableTestBase {
                         id));
         tEnv.executeSql(
                 String.format(
-                        "create table managed_table with ('path' = '%s') "
+                        "create table managed_table with ('root-path' = '%s') "
                                 + "like dummy_source (excluding options)",
                         rootPath));
         tEnv.executeSql("insert into managed_table select * from 
dummy_source").await();
@@ -1258,7 +1259,7 @@ public class ReadWriteTableITCase extends 
ReadWriteTableTestBase {
                                 + "'%s' = '%s'"
                                 + ") like `helper_source_%s` "
                                 + "(excluding options)",
-                        randomSinkId, FileStoreOptions.PATH.key(), rootPath, 
randomSourceId));
+                        randomSinkId, ROOT_PATH.key(), rootPath, 
randomSourceId));
 
         // insert multiple times
         tEnv.executeSql(
@@ -1374,7 +1375,7 @@ public class ReadWriteTableITCase extends 
ReadWriteTableTestBase {
                                 + "'%s' = '%s'"
                                 + ") like `helper_source_%s` "
                                 + "(excluding options)",
-                        randomSinkId, FileStoreOptions.PATH.key(), rootPath, 
randomSourceId));
+                        randomSinkId, ROOT_PATH.key(), rootPath, 
randomSourceId));
         tEnv.executeSql(
                         String.format(
                                 "insert into `managed_table_%s` select * from 
`helper_source_%s`",
@@ -1421,7 +1422,7 @@ public class ReadWriteTableITCase extends 
ReadWriteTableTestBase {
                                 + " rate BIGINT\n"
                                 + ") WITH (\n"
                                 + " 'bucket' = '2',\n"
-                                + " 'path' = '%s'\n"
+                                + " 'root-path' = '%s'\n"
                                 + ")",
                         rootPath));
         tEnv.executeSql("INSERT INTO rates VALUES('US Dollar', 102)").await();
@@ -1578,13 +1579,14 @@ public class ReadWriteTableITCase extends 
ReadWriteTableTestBase {
             throws IOException {
         // 1. create a mock table sink
         Map<String, String> options = new HashMap<>();
-        options.put(FileStoreOptions.PATH.key(), "/fake/path");
         if (configParallelism != null) {
             options.put(SINK_PARALLELISM.key(), configParallelism.toString());
         }
         options.put(
                 "path",
-                
TEMPORARY_FOLDER.newFolder(UUID.randomUUID().toString()).toURI().toString());
+                new File(TEMPORARY_FOLDER.newFolder(), 
UUID.randomUUID().toString())
+                        .toURI()
+                        .toString());
 
         DynamicTableFactory.Context context =
                 new FactoryUtil.DefaultDynamicTableContext(
@@ -1600,8 +1602,8 @@ public class ReadWriteTableITCase extends 
ReadWriteTableTestBase {
                         new Configuration(),
                         Thread.currentThread().getContextClassLoader(),
                         false);
-        new TableStoreFactory().onCreateTable(context, false);
-        DynamicTableSink tableSink = new 
TableStoreFactory().createDynamicTableSink(context);
+        new TableStoreManagedFactory().onCreateTable(context, false);
+        DynamicTableSink tableSink = new 
TableStoreManagedFactory().createDynamicTableSink(context);
         assertThat(tableSink).isInstanceOf(TableStoreSink.class);
 
         // 2. get sink provider
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java
index e4cf375..1e233b7 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java
@@ -58,6 +58,7 @@ import static 
org.apache.flink.table.store.connector.ShowCreateUtil.buildSelectQ
 import static 
org.apache.flink.table.store.connector.ShowCreateUtil.buildSimpleSelectQuery;
 import static 
org.apache.flink.table.store.connector.ShowCreateUtil.createTableLikeDDL;
 import static 
org.apache.flink.table.store.connector.TableStoreFactoryOptions.LOG_SYSTEM;
+import static 
org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
 import static 
org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
 import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -288,7 +289,7 @@ public class ReadWriteTableTestBase extends 
KafkaTableTestBase {
             throws Exception {
         Map<String, String> tableOptions = new HashMap<>();
         rootPath = TEMPORARY_FOLDER.newFolder().getPath();
-        tableOptions.put(FileStoreOptions.PATH.key(), rootPath);
+        tableOptions.put(ROOT_PATH.key(), rootPath);
         if (enableLogStore) {
             tableOptions.put(LOG_SYSTEM.key(), "kafka");
             tableOptions.put(LOG_PREFIX + BOOTSTRAP_SERVERS.key(), 
getBootstrapServers());
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreFactoryTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java
similarity index 73%
rename from 
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreFactoryTest.java
rename to 
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java
index 021ac98..ed75624 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreFactoryTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java
@@ -49,6 +49,9 @@ import java.util.UUID;
 import java.util.function.Predicate;
 import java.util.stream.Stream;
 
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonMap;
+import static 
org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
 import static 
org.apache.flink.table.store.connector.TableStoreTestBase.createResolvedTable;
 import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
 import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
@@ -60,13 +63,14 @@ import static 
org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** Test cases for {@link TableStoreFactory}. */
-public class TableStoreFactoryTest {
+/** Test cases for {@link TableStoreManagedFactory}. */
+public class TableStoreManagedFactoryTest {
 
     private static final ObjectIdentifier TABLE_IDENTIFIER =
             ObjectIdentifier.of("catalog", "database", "table");
 
-    private final TableStoreFactory tableStoreFactory = new 
TableStoreFactory();
+    private final TableStoreManagedFactory tableStoreManagedFactory =
+            new TableStoreManagedFactory();
 
     @TempDir private static java.nio.file.Path sharedTempDir;
     private DynamicTableFactory.Context context;
@@ -78,25 +82,63 @@ public class TableStoreFactoryTest {
             Map<String, String> tableOptions,
             Map<String, String> expectedEnrichedOptions) {
         context = createTableContext(sessionOptions, tableOptions);
-        Map<String, String> actualEnrichedOptions = 
tableStoreFactory.enrichOptions(context);
+        Map<String, String> actualEnrichedOptions = 
tableStoreManagedFactory.enrichOptions(context);
         assertThat(actualEnrichedOptions)
                 .containsExactlyInAnyOrderEntriesOf(expectedEnrichedOptions);
     }
 
+    @Test
+    public void testErrorEnrichOptions() {
+        Map<String, String> sessionMap = new HashMap<>();
+        sessionMap.put("table-store.root-path", "my_path");
+        sessionMap.put("table-store.path", "another_path");
+        context = createTableContext(sessionMap, emptyMap());
+        assertThatThrownBy(() -> 
tableStoreManagedFactory.enrichOptions(context))
+                .hasMessage(
+                        "Managed table can not contain table path. You need to 
remove path in table options or session config.");
+
+        context = createTableContext(emptyMap(), emptyMap());
+        assertThatThrownBy(() -> 
tableStoreManagedFactory.enrichOptions(context))
+                .hasMessage(
+                        "Please specify a root path by setting session level 
configuration as `SET 'table-store.root-path' = '...'`.");
+    }
+
+    @Test
+    public void testEnrichKafkaTopic() {
+        Map<String, String> sessionMap = new HashMap<>();
+        sessionMap.put("table-store.root-path", "my_path");
+        sessionMap.put("table-store.log.system", "kafka");
+        sessionMap.put("table-store.log.topic", "my_topic");
+        context = createTableContext(sessionMap, emptyMap());
+        assertThatThrownBy(() -> 
tableStoreManagedFactory.enrichOptions(context))
+                .hasMessage(
+                        "Managed table can not contain custom topic. You need 
to remove topic in table options or session config.");
+
+        sessionMap.remove("table-store.log.topic");
+        context = createTableContext(sessionMap, emptyMap());
+        Map<String, String> enriched = 
tableStoreManagedFactory.enrichOptions(context);
+
+        Map<String, String> expected = new HashMap<>();
+        expected.put("path", "my_path/catalog.catalog/database.db/table");
+        expected.put("log.system", "kafka");
+        expected.put("log.topic", "catalog.database.table");
+        assertThat(enriched).containsExactlyEntriesOf(expected);
+    }
+
     @ParameterizedTest
     @MethodSource("providingEnrichedOptionsForCreation")
     public void testOnCreateTable(Map<String, String> enrichedOptions, boolean 
ignoreIfExists) {
-        context = createTableContext(Collections.emptyMap(), enrichedOptions);
+        context = enrichContext(createTableContext(emptyMap(), 
enrichedOptions));
         Path expectedPath =
                 Paths.get(
                         sharedTempDir.toAbsolutePath().toString(),
                         relativeTablePath(TABLE_IDENTIFIER));
         boolean exist = expectedPath.toFile().exists();
         if (ignoreIfExists || !exist) {
-            tableStoreFactory.onCreateTable(context, ignoreIfExists);
+            tableStoreManagedFactory.onCreateTable(context, ignoreIfExists);
             assertThat(expectedPath).exists();
         } else {
-            assertThatThrownBy(() -> tableStoreFactory.onCreateTable(context, 
false))
+            assertThatThrownBy(() -> 
tableStoreManagedFactory.onCreateTable(context, false))
                     .isInstanceOf(TableException.class)
                     .hasMessageContaining(
                             String.format(
@@ -114,20 +156,32 @@ public class TableStoreFactoryTest {
         }
     }
 
+    private DynamicTableFactory.Context 
enrichContext(DynamicTableFactory.Context context) {
+        Map<String, String> newOptions = 
tableStoreManagedFactory.enrichOptions(context);
+        ResolvedCatalogTable table = 
context.getCatalogTable().copy(newOptions);
+        return new FactoryUtil.DefaultDynamicTableContext(
+                context.getObjectIdentifier(),
+                table,
+                emptyMap(),
+                context.getConfiguration(),
+                context.getClassLoader(),
+                context.isTemporary());
+    }
+
     @ParameterizedTest
     @MethodSource("providingEnrichedOptionsForDrop")
     public void testOnDropTable(Map<String, String> enrichedOptions, boolean 
ignoreIfNotExists) {
-        context = createTableContext(Collections.emptyMap(), enrichedOptions);
+        context = enrichContext(createTableContext(emptyMap(), 
enrichedOptions));
         Path expectedPath =
                 Paths.get(
                         sharedTempDir.toAbsolutePath().toString(),
                         relativeTablePath(TABLE_IDENTIFIER));
         boolean exist = expectedPath.toFile().exists();
         if (exist || ignoreIfNotExists) {
-            tableStoreFactory.onDropTable(context, ignoreIfNotExists);
+            tableStoreManagedFactory.onDropTable(context, ignoreIfNotExists);
             assertThat(expectedPath).doesNotExist();
         } else {
-            assertThatThrownBy(() -> tableStoreFactory.onDropTable(context, 
false))
+            assertThatThrownBy(() -> 
tableStoreManagedFactory.onDropTable(context, false))
                     .isInstanceOf(TableException.class)
                     .hasMessageContaining(
                             String.format(
@@ -155,7 +209,7 @@ public class TableStoreFactoryTest {
                 addPrefix(expectedLogOptions, LOG_PREFIX, (key) -> true);
         enrichedOptions.put("foo", "bar");
 
-        assertThat(TableStoreFactory.filterLogStoreOptions(enrichedOptions))
+        
assertThat(TableStoreManagedFactory.filterLogStoreOptions(enrichedOptions))
                 .containsExactlyInAnyOrderEntriesOf(expectedLogOptions);
     }
 
@@ -168,7 +222,7 @@ public class TableStoreFactoryTest {
             TableStoreTestBase.ExpectedResult expectedResult) {
         ResolvedCatalogTable catalogTable =
                 createResolvedTable(
-                        Collections.singletonMap(
+                        singletonMap(
                                 "path", sharedTempDir.toAbsolutePath() + "/" + 
UUID.randomUUID()),
                         rowType,
                         partitions,
@@ -177,13 +231,13 @@ public class TableStoreFactoryTest {
                 new FactoryUtil.DefaultDynamicTableContext(
                         TABLE_IDENTIFIER,
                         catalogTable,
-                        Collections.emptyMap(),
-                        Configuration.fromMap(Collections.emptyMap()),
+                        emptyMap(),
+                        Configuration.fromMap(emptyMap()),
                         Thread.currentThread().getContextClassLoader(),
                         false);
         if (expectedResult.success) {
-            tableStoreFactory.onCreateTable(context, false);
-            TableStore tableStore = tableStoreFactory.buildTableStore(context);
+            tableStoreManagedFactory.onCreateTable(context, false);
+            TableStore tableStore = 
AbstractTableStoreFactory.buildTableStore(context);
             
assertThat(tableStore.partitioned()).isEqualTo(catalogTable.isPartitioned());
             assertThat(tableStore.valueCountMode())
                     
.isEqualTo(catalogTable.getResolvedSchema().getPrimaryKeyIndexes().length == 0);
@@ -196,7 +250,7 @@ public class TableStoreFactoryTest {
                         .isTrue();
             }
         } else {
-            assertThatThrownBy(() -> tableStoreFactory.onCreateTable(context, 
false))
+            assertThatThrownBy(() -> 
tableStoreManagedFactory.onCreateTable(context, false))
                     .isInstanceOf(expectedResult.expectedType)
                     .hasMessageContaining(expectedResult.expectedMessage);
         }
@@ -209,31 +263,27 @@ public class TableStoreFactoryTest {
                 of(
                         BUCKET.key(),
                         BUCKET.defaultValue().toString(),
-                        PATH.key(),
+                        ROOT_PATH.key(),
                         sharedTempDir.toString(),
                         LOG_PREFIX + BOOTSTRAP_SERVERS.key(),
                         "localhost:9092",
                         LOG_PREFIX + CONSISTENCY.key(),
                         CONSISTENCY.defaultValue().name());
 
-        // default
-        Arguments arg0 =
-                Arguments.of(
-                        Collections.emptyMap(), Collections.emptyMap(), 
Collections.emptyMap());
-
         // set configuration under session level
         Arguments arg1 =
                 Arguments.of(
                         addPrefix(enrichedOptions, TABLE_STORE_PREFIX, (key) 
-> true),
-                        Collections.emptyMap(),
-                        enrichedOptions);
+                        emptyMap(),
+                        generateTablePath(enrichedOptions));
 
         // set configuration under table level
-        Arguments arg2 = Arguments.of(Collections.emptyMap(), enrichedOptions, 
enrichedOptions);
+        Arguments arg2 =
+                Arguments.of(emptyMap(), enrichedOptions, 
generateTablePath(enrichedOptions));
 
         // set both session and table level configuration to test options 
combination
         Map<String, String> tableOptions = new HashMap<>(enrichedOptions);
-        tableOptions.remove(PATH.key());
+        tableOptions.remove(ROOT_PATH.key());
         tableOptions.remove(CONSISTENCY.key());
         Arguments arg3 =
                 Arguments.of(
@@ -242,19 +292,32 @@ public class TableStoreFactoryTest {
                                 TABLE_STORE_PREFIX,
                                 (key) -> !tableOptions.containsKey(key)),
                         tableOptions,
-                        enrichedOptions);
+                        generateTablePath(enrichedOptions));
 
         // set same key with different value to test table configuration take 
precedence
         Map<String, String> sessionOptions = new HashMap<>();
         sessionOptions.put(
                 TABLE_STORE_PREFIX + BUCKET.key(), 
String.valueOf(BUCKET.defaultValue() + 1));
-        Arguments arg4 = Arguments.of(sessionOptions, enrichedOptions, 
enrichedOptions);
-        return Stream.of(arg0, arg1, arg2, arg3, arg4);
+
+        Arguments arg4 =
+                Arguments.of(sessionOptions, enrichedOptions, 
generateTablePath(enrichedOptions));
+        return Stream.of(arg1, arg2, arg3, arg4);
+    }
+
+    private static Map<String, String> generateTablePath(Map<String, String> 
enrichedOptions) {
+        Map<String, String> expected = new HashMap<>(enrichedOptions);
+        String rootPath = expected.remove(ROOT_PATH.key());
+        if (rootPath != null) {
+            String path =
+                    rootPath + "/" + 
TableStoreManagedFactory.relativeTablePath(TABLE_IDENTIFIER);
+            expected.put(PATH.key(), path);
+        }
+        return expected;
     }
 
     private static Stream<Arguments> providingEnrichedOptionsForCreation() {
         Map<String, String> enrichedOptions = new HashMap<>();
-        enrichedOptions.put(PATH.key(), 
sharedTempDir.toAbsolutePath().toString());
+        enrichedOptions.put(ROOT_PATH.key(), 
sharedTempDir.toAbsolutePath().toString());
         return Stream.of(
                 Arguments.of(enrichedOptions, false),
                 Arguments.of(enrichedOptions, true),
@@ -271,7 +334,7 @@ public class TableStoreFactoryTest {
             tablePath.mkdirs();
         }
         Map<String, String> enrichedOptions = new HashMap<>();
-        enrichedOptions.put(PATH.key(), 
sharedTempDir.toAbsolutePath().toString());
+        enrichedOptions.put(ROOT_PATH.key(), 
sharedTempDir.toAbsolutePath().toString());
         return Stream.of(
                 Arguments.of(enrichedOptions, false),
                 Arguments.of(enrichedOptions, true),
@@ -341,7 +404,7 @@ public class TableStoreFactoryTest {
         return new FactoryUtil.DefaultDynamicTableContext(
                 TABLE_IDENTIFIER,
                 resolvedCatalogTable,
-                Collections.emptyMap(),
+                emptyMap(),
                 Configuration.fromMap(sessionOptions),
                 Thread.currentThread().getContextClassLoader(),
                 false);
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
index c6762da..558f6d2 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
@@ -51,7 +51,7 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static 
org.apache.flink.table.store.connector.TableStoreFactoryOptions.LOG_SYSTEM;
-import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
+import static 
org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
 import static 
org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
 import static 
org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
 import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
@@ -109,7 +109,7 @@ public abstract class TableStoreTestBase extends 
KafkaTableTestBase {
 
     protected void prepareSessionContext() {
         Configuration configuration = tEnv.getConfig().getConfiguration();
-        configuration.setString(TABLE_STORE_PREFIX + PATH.key(), rootPath);
+        configuration.setString(TABLE_STORE_PREFIX + ROOT_PATH.key(), 
rootPath);
         configuration.setString(
                 TABLE_STORE_PREFIX + LOG_PREFIX + BOOTSTRAP_SERVERS.key(), 
getBootstrapServers());
         if (enableLogStore) {
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
index df59e66..06dd728 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
@@ -56,7 +56,7 @@ public class FileStoreOptions implements Serializable {
             ConfigOptions.key("path")
                     .stringType()
                     .noDefaultValue()
-                    .withDescription("The root file path of the table store in 
the filesystem.");
+                    .withDescription("The file path of this table in the 
filesystem.");
 
     public static final ConfigOption<String> FILE_FORMAT =
             ConfigOptions.key("file.format")
@@ -167,21 +167,16 @@ public class FileStoreOptions implements Serializable {
         return options.get(BUCKET);
     }
 
-    public Path path(ObjectIdentifier tableIdentifier) {
-        return path(options.toMap(), tableIdentifier);
+    public Path path() {
+        return path(options.toMap());
     }
 
-    public static Path path(Map<String, String> options, ObjectIdentifier 
tableIdentifier) {
-        Preconditions.checkArgument(
-                options.containsKey(PATH.key()),
-                String.format(
-                        "Failed to create file store path. "
-                                + "Please specify a root dir by setting 
session level configuration "
-                                + "as `SET 'table-store.%s' = '...'`. "
-                                + "Alternatively, you can use a per-table root 
dir "
-                                + "as `CREATE TABLE ${table} (...) WITH ('%s' 
= '...')`",
-                        PATH.key(), PATH.key()));
-        return new Path(options.get(PATH.key()), 
relativeTablePath(tableIdentifier));
+    public static Path path(Map<String, String> options) {
+        return new Path(options.get(PATH.key()));
+    }
+
+    public static Path path(Configuration options) {
+        return new Path(options.get(PATH));
     }
 
     public static String relativeTablePath(ObjectIdentifier tableIdentifier) {
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
index 5f28a3d..22c4e6c 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
@@ -38,6 +38,8 @@ import org.apache.flink.types.RowKind;
 
 import javax.annotation.Nullable;
 
+import java.util.Map;
+
 /**
  * Base interface for configuring a default log table connector. The log table 
is used by managed
  * table factory.
@@ -47,6 +49,9 @@ import javax.annotation.Nullable;
  */
 public interface LogStoreTableFactory extends DynamicTableFactory {
 
+    /** Enrich options from catalog and session information. */
+    Map<String, String> enrichOptions(Context context);
+
     /** Notifies the listener that a table creation occurred. */
     void onCreateTable(Context context, int numBucket, boolean ignoreIfExists);
 
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 6e6bf24..5a1ac1a 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.store.file;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.store.file.data.DataFileMeta;
@@ -112,7 +111,7 @@ public class TestFileStore extends FileStoreImpl {
             RowType valueType,
             MergeFunction mergeFunction) {
         super(
-                options.path(ObjectIdentifier.of("catalog", "database", 
"table")).toString(),
+                options.path().toString(),
                 0L,
                 options,
                 WriteMode.CHANGE_LOG,
diff --git 
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreBatchE2eTest.java
 
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreBatchE2eTest.java
index 0c6e349..047814c 100644
--- 
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreBatchE2eTest.java
+++ 
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreBatchE2eTest.java
@@ -77,7 +77,7 @@ public class FileStoreBatchE2eTest extends E2eTestBase {
                         + "    price INT\n"
                         + ") PARTITIONED BY (dt, hr) WITH (\n"
                         + "    'bucket' = '3',\n"
-                        + "    'path' = '%s'\n"
+                        + "    'root-path' = '%s'\n"
                         + ");";
         tableStoreDdl =
                 String.format(
diff --git 
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreFlinkFormatE2eTest.java
 
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreFlinkFormatE2eTest.java
index 8953275..0ceee85 100644
--- 
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreFlinkFormatE2eTest.java
+++ 
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreFlinkFormatE2eTest.java
@@ -36,7 +36,7 @@ public class FileStoreFlinkFormatE2eTest extends E2eTestBase {
                         + "    b VARCHAR\n"
                         + ") WITH (\n"
                         + "    'bucket' = '3',\n"
-                        + "    'path' = '%s',\n"
+                        + "    'root-path' = '%s',\n"
                         + "    'file.format' = 'csv'\n"
                         + ");";
         tableStoreDdl =
diff --git 
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreStreamE2eTest.java
 
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreStreamE2eTest.java
index f37a3fd..44009ec 100644
--- 
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreStreamE2eTest.java
+++ 
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreStreamE2eTest.java
@@ -48,7 +48,7 @@ public class FileStoreStreamE2eTest extends E2eTestBase {
                         + "    rn BIGINT\n"
                         + ") WITH (\n"
                         + "    'bucket' = '3',\n"
-                        + "    'path' = '%s'\n"
+                        + "    'root-path' = '%s'\n"
                         + ");";
         tableStoreDdl =
                 String.format(
diff --git 
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/LogStoreE2eTest.java
 
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/LogStoreE2eTest.java
index e601b28..465b638 100644
--- 
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/LogStoreE2eTest.java
+++ 
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/LogStoreE2eTest.java
@@ -38,7 +38,7 @@ public class LogStoreE2eTest extends E2eTestBase {
                         + "    PRIMARY KEY (k) NOT ENFORCED\n"
                         + ") WITH (\n"
                         + "    'bucket' = '3',\n"
-                        + "    'path' = '%s'\n"
+                        + "    'root-path' = '%s'\n"
                         + ");";
         String tableStoreDir = UUID.randomUUID().toString() + ".store";
         tableStoreBatchDdl = String.format(tableStoreBatchDdl, TEST_DATA_DIR + 
"/" + tableStoreDir);
@@ -71,7 +71,7 @@ public class LogStoreE2eTest extends E2eTestBase {
                         + "    PRIMARY KEY (k) NOT ENFORCED\n"
                         + ") WITH (\n"
                         + "    'bucket' = '3',\n"
-                        + "    'path' = '%s',\n"
+                        + "    'root-path' = '%s',\n"
                         + "    'log.consistency' = 'eventual',\n"
                         + "    'log.system' = 'kafka',\n"
                         + "    'log.kafka.bootstrap.servers' = '%s'\n"
diff --git 
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/TypeE2eTest.java
 
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/TypeE2eTest.java
index 81fd14b..2e514b4 100644
--- 
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/TypeE2eTest.java
+++ 
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/TypeE2eTest.java
@@ -57,7 +57,7 @@ public class TypeE2eTest extends E2eTestBase {
                                 schema,
                                 ") WITH (",
                                 "  'bucket' = '1',",
-                                "  'path' = '%s'",
+                                "  'root-path' = '%s'",
                                 ");"));
         tableStoreDdl =
                 String.format(
@@ -128,7 +128,7 @@ public class TypeE2eTest extends E2eTestBase {
                                 "PRIMARY KEY (pk) NOT ENFORCED",
                                 ") WITH (",
                                 "  'bucket' = '1',",
-                                "  'path' = '%s'",
+                                "  'root-path' = '%s'",
                                 ");"));
         tableStoreDdl =
                 String.format(
diff --git 
a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/FileStoreTestHelper.java
 
b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/FileStoreTestHelper.java
index d6c5234..38071e5 100644
--- 
a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/FileStoreTestHelper.java
+++ 
b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/FileStoreTestHelper.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.store;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.FileStoreImpl;
@@ -60,7 +59,6 @@ public class FileStoreTestHelper {
     private final ExecutorService compactExecutor;
 
     public FileStoreTestHelper(
-            ObjectIdentifier oi,
             Configuration conf,
             RowType partitionType,
             RowType keyType,
@@ -71,7 +69,7 @@ public class FileStoreTestHelper {
         FileStoreOptions options = new FileStoreOptions(conf);
         this.store =
                 new FileStoreImpl(
-                        options.path(oi).toString(),
+                        options.path().toString(),
                         0,
                         options,
                         WriteMode.CHANGE_LOG,
diff --git 
a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
 
b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
index 7fe4f30..5f33a37 100644
--- 
a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
+++ 
b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.store.hive;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connectors.hive.FlinkEmbeddedHiveRunner;
 import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
@@ -83,14 +82,13 @@ public class TableStoreHiveStorageHandlerITCase {
 
     @Test
     public void testReadExternalTableWithPk() throws Exception {
-        String root = folder.getRoot().getPath();
+        String path = folder.newFolder().toURI().toString();
         Configuration conf = new Configuration();
-        conf.setString(FileStoreOptions.PATH, root);
+        conf.setString(FileStoreOptions.PATH, path);
         conf.setInteger(FileStoreOptions.BUCKET, 2);
         conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
         FileStoreTestHelper helper =
                 new FileStoreTestHelper(
-                        ObjectIdentifier.of("test_catalog", "test_db", 
"test_table"),
                         conf,
                         RowType.of(),
                         RowType.of(
@@ -146,9 +144,7 @@ public class TableStoreHiveStorageHandlerITCase {
                                 "  c STRING",
                                 ")",
                                 "STORED BY '" + 
TableStoreHiveStorageHandler.class.getName() + "'",
-                                "LOCATION '"
-                                        + root
-                                        + 
"/test_catalog.catalog/test_db.db/test_table'",
+                                "LOCATION '" + path + "'",
                                 "TBLPROPERTIES (",
                                 "  'table-store.catalog' = 'test_catalog',",
                                 "  'table-store.primary-keys' = 'a,b',",
@@ -162,14 +158,13 @@ public class TableStoreHiveStorageHandlerITCase {
 
     @Test
     public void testReadExternalTableWithoutPk() throws Exception {
-        String root = folder.getRoot().getPath();
+        String path = folder.newFolder().toURI().toString();
         Configuration conf = new Configuration();
-        conf.setString(FileStoreOptions.PATH, root);
+        conf.setString(FileStoreOptions.PATH, path);
         conf.setInteger(FileStoreOptions.BUCKET, 2);
         conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
         FileStoreTestHelper helper =
                 new FileStoreTestHelper(
-                        ObjectIdentifier.of("test_catalog", "test_db", 
"test_table"),
                         conf,
                         RowType.of(),
                         RowType.of(
@@ -222,9 +217,7 @@ public class TableStoreHiveStorageHandlerITCase {
                                 "  c STRING",
                                 ")",
                                 "STORED BY '" + 
TableStoreHiveStorageHandler.class.getName() + "'",
-                                "LOCATION '"
-                                        + root
-                                        + 
"/test_catalog.catalog/test_db.db/test_table'",
+                                "LOCATION '" + path + "'",
                                 "TBLPROPERTIES (",
                                 "  'table-store.catalog' = 'test_catalog',",
                                 "  'table-store.bucket' = '2',",
@@ -244,7 +237,6 @@ public class TableStoreHiveStorageHandlerITCase {
         conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
         FileStoreTestHelper helper =
                 new FileStoreTestHelper(
-                        ObjectIdentifier.of("test_catalog", "test_db", 
"test_table"),
                         conf,
                         RowType.of(),
                         RowType.of(
@@ -300,9 +292,7 @@ public class TableStoreHiveStorageHandlerITCase {
                                 ddl.toString(),
                                 ")",
                                 "STORED BY '" + 
TableStoreHiveStorageHandler.class.getName() + "'",
-                                "LOCATION '"
-                                        + root
-                                        + 
"/test_catalog.catalog/test_db.db/test_table'",
+                                "LOCATION '" + root + "'",
                                 "TBLPROPERTIES (",
                                 "  'table-store.catalog' = 'test_catalog',",
                                 "  'table-store.primary-keys' = 'f_int',",
@@ -385,13 +375,12 @@ public class TableStoreHiveStorageHandlerITCase {
 
     @Test
     public void testPredicatePushDown() throws Exception {
-        String root = folder.getRoot().getPath();
+        String path = folder.newFolder().toURI().toString();
         Configuration conf = new Configuration();
-        conf.setString(FileStoreOptions.PATH, root);
+        conf.setString(FileStoreOptions.PATH, path);
         conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
         FileStoreTestHelper helper =
                 new FileStoreTestHelper(
-                        ObjectIdentifier.of("test_catalog", "test_db", 
"test_table"),
                         conf,
                         RowType.of(),
                         RowType.of(
@@ -427,9 +416,7 @@ public class TableStoreHiveStorageHandlerITCase {
                                 "  a INT",
                                 ")",
                                 "STORED BY '" + 
TableStoreHiveStorageHandler.class.getName() + "'",
-                                "LOCATION '"
-                                        + root
-                                        + 
"/test_catalog.catalog/test_db.db/test_table'",
+                                "LOCATION '" + path + "'",
                                 "TBLPROPERTIES (",
                                 "  'table-store.catalog' = 'test_catalog',",
                                 "  'table-store.file.format' = 'avro'",
@@ -480,13 +467,12 @@ public class TableStoreHiveStorageHandlerITCase {
 
     @Test
     public void testDateAndTimestamp() throws Exception {
-        String root = folder.getRoot().getPath();
+        String path = folder.newFolder().toURI().toString();
         Configuration conf = new Configuration();
-        conf.setString(FileStoreOptions.PATH, root);
+        conf.setString(FileStoreOptions.PATH, path);
         conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
         FileStoreTestHelper helper =
                 new FileStoreTestHelper(
-                        ObjectIdentifier.of("test_catalog", "test_db", 
"test_table"),
                         conf,
                         RowType.of(),
                         RowType.of(
@@ -532,9 +518,7 @@ public class TableStoreHiveStorageHandlerITCase {
                                 "  ts TIMESTAMP",
                                 ")",
                                 "STORED BY '" + 
TableStoreHiveStorageHandler.class.getName() + "'",
-                                "LOCATION '"
-                                        + root
-                                        + 
"/test_catalog.catalog/test_db.db/test_table'",
+                                "LOCATION '" + path + "'",
                                 "TBLPROPERTIES (",
                                 "  'table-store.catalog' = 'test_catalog',",
                                 "  'table-store.file.format' = 'avro'",
diff --git 
a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
 
b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
index b8e1105..01c560c 100644
--- 
a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
+++ 
b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.store.mapred;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
@@ -58,7 +57,6 @@ public class TableStoreRecordReaderTest {
         conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
         FileStoreTestHelper helper =
                 new FileStoreTestHelper(
-                        ObjectIdentifier.of("test_catalog", "test_db", 
"test_table"),
                         conf,
                         RowType.of(),
                         RowType.of(
@@ -119,7 +117,6 @@ public class TableStoreRecordReaderTest {
         conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
         FileStoreTestHelper helper =
                 new FileStoreTestHelper(
-                        ObjectIdentifier.of("test_catalog", "test_db", 
"test_table"),
                         conf,
                         RowType.of(),
                         RowType.of(
diff --git 
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogOptions.java
 
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogOptions.java
index 8882488..8cdb6d0 100644
--- 
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogOptions.java
+++ 
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogOptions.java
@@ -29,4 +29,10 @@ public class KafkaLogOptions {
                     .stringType()
                     .noDefaultValue()
                     .withDescription("Required Kafka server connection 
string");
+
+    public static final ConfigOption<String> TOPIC =
+            ConfigOptions.key("topic")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Topic of this kafka table.");
 }
diff --git 
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
 
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
index 9904934..9828daf 100644
--- 
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
+++ 
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
@@ -31,6 +31,7 @@ import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.store.log.LogStoreTableFactory;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.util.Preconditions;
 
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.NewTopic;
@@ -51,6 +52,7 @@ import java.util.concurrent.ExecutionException;
 
 import static 
org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;
 import static 
org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
+import static org.apache.flink.table.store.kafka.KafkaLogOptions.TOPIC;
 import static org.apache.flink.table.store.log.LogOptions.CHANGELOG_MODE;
 import static org.apache.flink.table.store.log.LogOptions.CONSISTENCY;
 import static org.apache.flink.table.store.log.LogOptions.FORMAT;
@@ -84,6 +86,7 @@ public class KafkaLogStoreFactory implements 
LogStoreTableFactory {
     public Set<ConfigOption<?>> optionalOptions() {
         Set<ConfigOption<?>> options = new HashSet<>();
         options.add(SCAN);
+        options.add(TOPIC);
         options.add(SCAN_TIMESTAMP_MILLS);
         options.add(RETENTION);
         options.add(CONSISTENCY);
@@ -93,6 +96,23 @@ public class KafkaLogStoreFactory implements 
LogStoreTableFactory {
         return options;
     }
 
+    @Override
+    public Map<String, String> enrichOptions(Context context) {
+        Map<String, String> options = new 
HashMap<>(context.getCatalogTable().getOptions());
+        Preconditions.checkArgument(
+                !options.containsKey(TOPIC.key()),
+                "Managed table can not contain custom topic. "
+                        + "You need to remove topic in table options or 
session config.");
+
+        String topic = context.getObjectIdentifier().asSummaryString();
+        options.put(TOPIC.key(), topic);
+        return options;
+    }
+
+    private String topic(Context context) {
+        return context.getCatalogTable().getOptions().get(TOPIC.key());
+    }
+
     @Override
     public void onCreateTable(Context context, int numBucket, boolean 
ignoreIfExists) {
         FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, 
context);
@@ -217,10 +237,6 @@ public class KafkaLogStoreFactory implements 
LogStoreTableFactory {
                 helper.getOptions().get(CHANGELOG_MODE));
     }
 
-    private static String topic(DynamicTableFactory.Context context) {
-        return context.getObjectIdentifier().asSummaryString();
-    }
-
     public static Properties toKafkaProperties(ReadableConfig options) {
         Properties properties = new Properties();
         Map<String, String> optionMap = ((Configuration) options).toMap();
diff --git 
a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
 
b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
index a375d14..4054009 100644
--- 
a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
+++ 
b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
@@ -49,12 +49,14 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.apache.flink.table.data.binary.BinaryRowDataUtil.EMPTY_ROW;
 import static 
org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
 import static 
org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
+import static org.apache.flink.table.store.kafka.KafkaLogOptions.TOPIC;
 import static org.apache.flink.table.store.log.LogOptions.CHANGELOG_MODE;
 import static org.apache.flink.table.store.log.LogOptions.CONSISTENCY;
 import static org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
@@ -189,6 +191,7 @@ public class KafkaLogTestUtils {
         options.put(CHANGELOG_MODE.key(), changelogMode.toString());
         options.put(CONSISTENCY.key(), consistency.toString());
         options.put(BOOTSTRAP_SERVERS.key(), servers);
+        options.put(TOPIC.key(), UUID.randomUUID().toString());
         return createContext(name, type, keys, options);
     }
 

Reply via email to