This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 503899e  [hotfix] Remove 'table.path' option and let FileStore 
construct table path dynamically
503899e is described below

commit 503899e9c17cf782b96d6de017a06655116deadd
Author: Jane Chan <[email protected]>
AuthorDate: Tue Mar 15 17:01:05 2022 +0800

    [hotfix] Remove 'table.path' option and let FileStore construct table path 
dynamically
    
    This closes #45
---
 .../flink/table/store/connector/TableStore.java       |  3 ++-
 .../flink/table/store/connector/FileStoreITCase.java  |  6 +++---
 .../apache/flink/table/store/file/FileStoreImpl.java  |  6 +++++-
 .../flink/table/store/file/FileStoreOptions.java      | 19 +++++++++----------
 .../apache/flink/table/store/file/TestFileStore.java  | 14 +++++++++++---
 5 files changed, 30 insertions(+), 18 deletions(-)

diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
index b67bf8f..a8667f1 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
@@ -141,7 +141,8 @@ public class TableStore {
             valueType = type;
             mergeFunction = new DeduplicateMergeFunction();
         }
-        return new FileStoreImpl(options, user, partitionType, keyType, 
valueType, mergeFunction);
+        return new FileStoreImpl(
+                tableIdentifier, options, user, partitionType, keyType, 
valueType, mergeFunction);
     }
 
     /** Source builder to build a flink {@link Source}. */
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
index 7c4a884..7c499d3 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
@@ -59,7 +59,7 @@ import java.util.stream.Stream;
 
 import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
 import static org.apache.flink.table.store.file.FileStoreOptions.FILE_FORMAT;
-import static org.apache.flink.table.store.file.FileStoreOptions.TABLE_PATH;
+import static org.apache.flink.table.store.file.FileStoreOptions.FILE_PATH;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** ITCase for {@link FileStoreSource} and {@link StoreSink}. */
@@ -237,10 +237,10 @@ public class FileStoreITCase extends AbstractTestBase {
         Configuration options = new Configuration();
         options.set(BUCKET, NUM_BUCKET);
         if (noFail) {
-            options.set(TABLE_PATH, folder.toURI().toString());
+            options.set(FILE_PATH, folder.toURI().toString());
         } else {
             FailingAtomicRenameFileSystem.get().reset(3, 100);
-            options.set(TABLE_PATH, 
FailingAtomicRenameFileSystem.getFailingPath(folder.getPath()));
+            options.set(FILE_PATH, 
FailingAtomicRenameFileSystem.getFailingPath(folder.getPath()));
         }
         options.set(FILE_FORMAT, "avro");
         return options;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
index bc779c9..e18319f 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.file;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
 import org.apache.flink.table.planner.plan.utils.SortUtil;
 import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
@@ -42,6 +43,7 @@ import java.util.stream.IntStream;
 /** File store implementation. */
 public class FileStoreImpl implements FileStore {
 
+    private final ObjectIdentifier tableIdentifier;
     private final FileStoreOptions options;
     private final String user;
     private final RowType partitionType;
@@ -51,12 +53,14 @@ public class FileStoreImpl implements FileStore {
     private final GeneratedRecordComparator genRecordComparator;
 
     public FileStoreImpl(
+            ObjectIdentifier tableIdentifier,
             Configuration options,
             String user,
             RowType partitionType,
             RowType keyType,
             RowType valueType,
             MergeFunction mergeFunction) {
+        this.tableIdentifier = tableIdentifier;
         this.options = new FileStoreOptions(options);
         this.user = user;
         this.partitionType = partitionType;
@@ -75,7 +79,7 @@ public class FileStoreImpl implements FileStore {
     @VisibleForTesting
     public FileStorePathFactory pathFactory() {
         return new FileStorePathFactory(
-                options.path(), partitionType, options.partitionDefaultName());
+                options.path(tableIdentifier), partitionType, 
options.partitionDefaultName());
     }
 
     @VisibleForTesting
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
index ddceee8..41b0490 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.table.store.file;
 
-import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
 
@@ -51,13 +51,6 @@ public class FileStoreOptions implements Serializable {
                     .noDefaultValue()
                     .withDescription("The root file path of the table store in 
the filesystem.");
 
-    @Internal
-    public static final ConfigOption<String> TABLE_PATH =
-            ConfigOptions.key("table.path")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("The table file path of the table store 
in the filesystem.");
-
     public static final ConfigOption<String> FILE_FORMAT =
             ConfigOptions.key("file.format")
                     .stringType()
@@ -129,8 +122,14 @@ public class FileStoreOptions implements Serializable {
         return options.get(BUCKET);
     }
 
-    public Path path() {
-        return new Path(options.get(TABLE_PATH));
+    public Path path(ObjectIdentifier tableIdentifier) {
+        return new Path(
+                options.get(FILE_PATH),
+                String.format(
+                        "root/%s.catalog/%s.db/%s",
+                        tableIdentifier.getCatalogName(),
+                        tableIdentifier.getDatabaseName(),
+                        tableIdentifier.getObjectName()));
     }
 
     public FileFormat fileFormat() {
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 4fb291c..319970d 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.file;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.store.file.manifest.ManifestCommittable;
@@ -95,7 +96,7 @@ public class TestFileStore extends FileStoreImpl {
 
         conf.set(FileStoreOptions.FILE_FORMAT, format);
         conf.set(FileStoreOptions.MANIFEST_FORMAT, format);
-        conf.set(FileStoreOptions.TABLE_PATH, root);
+        conf.set(FileStoreOptions.FILE_PATH, root);
         conf.set(FileStoreOptions.BUCKET, numBuckets);
 
         return new TestFileStore(conf, partitionType, keyType, valueType, 
mergeFunction);
@@ -107,8 +108,15 @@ public class TestFileStore extends FileStoreImpl {
             RowType keyType,
             RowType valueType,
             MergeFunction mergeFunction) {
-        super(conf, UUID.randomUUID().toString(), partitionType, keyType, 
valueType, mergeFunction);
-        this.root = conf.getString(FileStoreOptions.TABLE_PATH);
+        super(
+                ObjectIdentifier.of("catalog", "database", "table"),
+                conf,
+                UUID.randomUUID().toString(),
+                partitionType,
+                keyType,
+                valueType,
+                mergeFunction);
+        this.root = conf.getString(FileStoreOptions.FILE_PATH);
         this.keySerializer = new RowDataSerializer(keyType);
         this.valueSerializer = new RowDataSerializer(valueType);
     }

Reply via email to