This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 503899e [hotfix] Remove 'table.path' option and let FileStore
construct table path dynamically
503899e is described below
commit 503899e9c17cf782b96d6de017a06655116deadd
Author: Jane Chan <[email protected]>
AuthorDate: Tue Mar 15 17:01:05 2022 +0800
[hotfix] Remove 'table.path' option and let FileStore construct table path
dynamically
This closes #45
---
.../flink/table/store/connector/TableStore.java | 3 ++-
.../flink/table/store/connector/FileStoreITCase.java | 6 +++---
.../apache/flink/table/store/file/FileStoreImpl.java | 6 +++++-
.../flink/table/store/file/FileStoreOptions.java | 19 +++++++++----------
.../apache/flink/table/store/file/TestFileStore.java | 14 +++++++++++---
5 files changed, 30 insertions(+), 18 deletions(-)
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
index b67bf8f..a8667f1 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
@@ -141,7 +141,8 @@ public class TableStore {
valueType = type;
mergeFunction = new DeduplicateMergeFunction();
}
- return new FileStoreImpl(options, user, partitionType, keyType,
valueType, mergeFunction);
+ return new FileStoreImpl(
+ tableIdentifier, options, user, partitionType, keyType,
valueType, mergeFunction);
}
/** Source builder to build a flink {@link Source}. */
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
index 7c4a884..7c499d3 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
@@ -59,7 +59,7 @@ import java.util.stream.Stream;
import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
import static org.apache.flink.table.store.file.FileStoreOptions.FILE_FORMAT;
-import static org.apache.flink.table.store.file.FileStoreOptions.TABLE_PATH;
+import static org.apache.flink.table.store.file.FileStoreOptions.FILE_PATH;
import static org.assertj.core.api.Assertions.assertThat;
/** ITCase for {@link FileStoreSource} and {@link StoreSink}. */
@@ -237,10 +237,10 @@ public class FileStoreITCase extends AbstractTestBase {
Configuration options = new Configuration();
options.set(BUCKET, NUM_BUCKET);
if (noFail) {
- options.set(TABLE_PATH, folder.toURI().toString());
+ options.set(FILE_PATH, folder.toURI().toString());
} else {
FailingAtomicRenameFileSystem.get().reset(3, 100);
- options.set(TABLE_PATH,
FailingAtomicRenameFileSystem.getFailingPath(folder.getPath()));
+ options.set(FILE_PATH,
FailingAtomicRenameFileSystem.getFailingPath(folder.getPath()));
}
options.set(FILE_FORMAT, "avro");
return options;
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
index bc779c9..e18319f 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.file;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
import org.apache.flink.table.planner.plan.utils.SortUtil;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
@@ -42,6 +43,7 @@ import java.util.stream.IntStream;
/** File store implementation. */
public class FileStoreImpl implements FileStore {
+ private final ObjectIdentifier tableIdentifier;
private final FileStoreOptions options;
private final String user;
private final RowType partitionType;
@@ -51,12 +53,14 @@ public class FileStoreImpl implements FileStore {
private final GeneratedRecordComparator genRecordComparator;
public FileStoreImpl(
+ ObjectIdentifier tableIdentifier,
Configuration options,
String user,
RowType partitionType,
RowType keyType,
RowType valueType,
MergeFunction mergeFunction) {
+ this.tableIdentifier = tableIdentifier;
this.options = new FileStoreOptions(options);
this.user = user;
this.partitionType = partitionType;
@@ -75,7 +79,7 @@ public class FileStoreImpl implements FileStore {
@VisibleForTesting
public FileStorePathFactory pathFactory() {
return new FileStorePathFactory(
- options.path(), partitionType, options.partitionDefaultName());
+ options.path(tableIdentifier), partitionType,
options.partitionDefaultName());
}
@VisibleForTesting
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
index ddceee8..41b0490 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
@@ -18,12 +18,12 @@
package org.apache.flink.table.store.file;
-import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
@@ -51,13 +51,6 @@ public class FileStoreOptions implements Serializable {
.noDefaultValue()
.withDescription("The root file path of the table store in
the filesystem.");
- @Internal
- public static final ConfigOption<String> TABLE_PATH =
- ConfigOptions.key("table.path")
- .stringType()
- .noDefaultValue()
- .withDescription("The table file path of the table store
in the filesystem.");
-
public static final ConfigOption<String> FILE_FORMAT =
ConfigOptions.key("file.format")
.stringType()
@@ -129,8 +122,14 @@ public class FileStoreOptions implements Serializable {
return options.get(BUCKET);
}
- public Path path() {
- return new Path(options.get(TABLE_PATH));
+ public Path path(ObjectIdentifier tableIdentifier) {
+ return new Path(
+ options.get(FILE_PATH),
+ String.format(
+ "root/%s.catalog/%s.db/%s",
+ tableIdentifier.getCatalogName(),
+ tableIdentifier.getDatabaseName(),
+ tableIdentifier.getObjectName()));
}
public FileFormat fileFormat() {
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 4fb291c..319970d 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.file;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.store.file.manifest.ManifestCommittable;
@@ -95,7 +96,7 @@ public class TestFileStore extends FileStoreImpl {
conf.set(FileStoreOptions.FILE_FORMAT, format);
conf.set(FileStoreOptions.MANIFEST_FORMAT, format);
- conf.set(FileStoreOptions.TABLE_PATH, root);
+ conf.set(FileStoreOptions.FILE_PATH, root);
conf.set(FileStoreOptions.BUCKET, numBuckets);
return new TestFileStore(conf, partitionType, keyType, valueType,
mergeFunction);
@@ -107,8 +108,15 @@ public class TestFileStore extends FileStoreImpl {
RowType keyType,
RowType valueType,
MergeFunction mergeFunction) {
- super(conf, UUID.randomUUID().toString(), partitionType, keyType,
valueType, mergeFunction);
- this.root = conf.getString(FileStoreOptions.TABLE_PATH);
+ super(
+ ObjectIdentifier.of("catalog", "database", "table"),
+ conf,
+ UUID.randomUUID().toString(),
+ partitionType,
+ keyType,
+ valueType,
+ mergeFunction);
+ this.root = conf.getString(FileStoreOptions.FILE_PATH);
this.keySerializer = new RowDataSerializer(keyType);
this.valueSerializer = new RowDataSerializer(valueType);
}