This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 14c26f9 [FLINK-27752] Change 'path' to 'root-path' in table store
14c26f9 is described below
commit 14c26f9f0fa4a9fa4094c338729036fb4166ed66
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed May 25 14:35:48 2022 +0800
[FLINK-27752] Change 'path' to 'root-path' in table store
This closes #136
---
docs/content/docs/development/create-table.md | 4 +-
docs/content/docs/try-table-store/quick-start.md | 2 +-
.../store/connector/AbstractTableStoreFactory.java | 196 +++++++++++++
.../flink/table/store/connector/TableStore.java | 8 +-
.../table/store/connector/TableStoreFactory.java | 311 ---------------------
.../store/connector/TableStoreFactoryOptions.java | 8 +-
.../store/connector/TableStoreManagedFactory.java | 188 +++++++++++++
.../org.apache.flink.table.factories.Factory | 2 +-
.../table/store/connector/FileStoreITCase.java | 2 +-
.../store/connector/FileStoreTableITCase.java | 4 +-
.../store/connector/ReadWriteTableITCase.java | 20 +-
.../store/connector/ReadWriteTableTestBase.java | 3 +-
...Test.java => TableStoreManagedFactoryTest.java} | 129 ++++++---
.../table/store/connector/TableStoreTestBase.java | 4 +-
.../flink/table/store/file/FileStoreOptions.java | 23 +-
.../table/store/log/LogStoreTableFactory.java | 5 +
.../flink/table/store/file/TestFileStore.java | 3 +-
.../table/store/tests/FileStoreBatchE2eTest.java | 2 +-
.../store/tests/FileStoreFlinkFormatE2eTest.java | 2 +-
.../table/store/tests/FileStoreStreamE2eTest.java | 2 +-
.../flink/table/store/tests/LogStoreE2eTest.java | 4 +-
.../flink/table/store/tests/TypeE2eTest.java | 4 +-
.../flink/table/store/FileStoreTestHelper.java | 4 +-
.../hive/TableStoreHiveStorageHandlerITCase.java | 42 +--
.../store/mapred/TableStoreRecordReaderTest.java | 3 -
.../flink/table/store/kafka/KafkaLogOptions.java | 6 +
.../table/store/kafka/KafkaLogStoreFactory.java | 24 +-
.../flink/table/store/kafka/KafkaLogTestUtils.java | 3 +
28 files changed, 578 insertions(+), 430 deletions(-)
diff --git a/docs/content/docs/development/create-table.md
b/docs/content/docs/development/create-table.md
index 95f5e87..21efdf4 100644
--- a/docs/content/docs/development/create-table.md
+++ b/docs/content/docs/development/create-table.md
@@ -106,7 +106,7 @@ Important options include the following:
</thead>
<tbody>
<tr>
- <td><h5>path</h5></td>
+ <td><h5>root-path</h5></td>
<td>Yes</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
@@ -147,7 +147,7 @@ Important options include the following:
Creating a table will create the corresponding physical storage:
- The table's FileStore directory will be created under:
- `${path}/${catalog_name}.catalog/${database_name}.db/${table_name}`
+ `${root-path}/${catalog_name}.catalog/${database_name}.db/${table_name}`
- If `log.system` is configured as Kafka, a Topic named
"${catalog_name}.${database_name}.${table_name}" will be created
automatically when the table is created.
diff --git a/docs/content/docs/try-table-store/quick-start.md
b/docs/content/docs/try-table-store/quick-start.md
index 3c00f5b..78a4eea 100644
--- a/docs/content/docs/try-table-store/quick-start.md
+++ b/docs/content/docs/try-table-store/quick-start.md
@@ -112,7 +112,7 @@ Start the SQL Client CLI:
```sql
-- set root path to session config
-SET 'table-store.path' = '/tmp/table_store';
+SET 'table-store.root-path' = '/tmp/table_store';
-- create a word count dynamic table without 'connector' option
CREATE TABLE word_count (
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
new file mode 100644
index 0000000..4b842a5
--- /dev/null
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.store.connector.sink.TableStoreSink;
+import org.apache.flink.table.store.connector.source.TableStoreSource;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
+import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.log.LogOptions;
+import org.apache.flink.table.store.log.LogStoreTableFactory;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static
org.apache.flink.table.store.connector.TableStoreFactoryOptions.LOG_SYSTEM;
+import static org.apache.flink.table.store.log.LogOptions.CHANGELOG_MODE;
+import static org.apache.flink.table.store.log.LogOptions.CONSISTENCY;
+import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
+import static org.apache.flink.table.store.log.LogOptions.SCAN;
+import static
org.apache.flink.table.store.log.LogStoreTableFactory.discoverLogStoreFactory;
+
+/** Abstract table store factory to create table source and table sink. */
+public abstract class AbstractTableStoreFactory
+ implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+ @Override
+ public TableStoreSource createDynamicTableSource(Context context) {
+ return new TableStoreSource(
+ buildTableStore(context),
+ context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
+ == RuntimeExecutionMode.STREAMING,
+ createLogContext(context),
+ createOptionalLogStoreFactory(context).orElse(null));
+ }
+
+ @Override
+ public TableStoreSink createDynamicTableSink(Context context) {
+ return new TableStoreSink(
+ buildTableStore(context),
+ createLogContext(context),
+ createOptionalLogStoreFactory(context).orElse(null));
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ Set<ConfigOption<?>> options = FileStoreOptions.allOptions();
+ options.addAll(MergeTreeOptions.allOptions());
+ options.addAll(TableStoreFactoryOptions.allOptions());
+ return options;
+ }
+
+ // ~ Tools
------------------------------------------------------------------
+
+ static Optional<LogStoreTableFactory> createOptionalLogStoreFactory(
+ DynamicTableFactory.Context context) {
+ return createOptionalLogStoreFactory(
+ context.getClassLoader(),
context.getCatalogTable().getOptions());
+ }
+
+ static Optional<LogStoreTableFactory> createOptionalLogStoreFactory(
+ ClassLoader classLoader, Map<String, String> options) {
+ Configuration configOptions = new Configuration();
+ options.forEach(configOptions::setString);
+
+ if (configOptions.get(LOG_SYSTEM) == null) {
+ // Use file store continuous reading
+ validateFileStoreContinuous(configOptions);
+ return Optional.empty();
+ }
+
+ return Optional.of(discoverLogStoreFactory(classLoader,
configOptions.get(LOG_SYSTEM)));
+ }
+
+ private static void validateFileStoreContinuous(Configuration options) {
+ Configuration logOptions = new DelegatingConfiguration(options,
LOG_PREFIX);
+ LogOptions.LogChangelogMode changelogMode =
logOptions.get(CHANGELOG_MODE);
+ if (changelogMode == LogOptions.LogChangelogMode.UPSERT) {
+ throw new ValidationException(
+ "File store continuous reading dose not support upsert
changelog mode.");
+ }
+ LogOptions.LogConsistency consistency = logOptions.get(CONSISTENCY);
+ if (consistency == LogOptions.LogConsistency.EVENTUAL) {
+ throw new ValidationException(
+ "File store continuous reading dose not support eventual
consistency mode.");
+ }
+ LogOptions.LogStartupMode startupMode = logOptions.get(SCAN);
+ if (startupMode == LogOptions.LogStartupMode.FROM_TIMESTAMP) {
+ throw new ValidationException(
+ "File store continuous reading dose not support
from_timestamp scan mode, "
+ + "you can add timestamp filters instead.");
+ }
+ }
+
+ static DynamicTableFactory.Context
createLogContext(DynamicTableFactory.Context context) {
+ return createLogContext(context,
context.getCatalogTable().getOptions());
+ }
+
+ static DynamicTableFactory.Context createLogContext(
+ DynamicTableFactory.Context context, Map<String, String> options) {
+ return new FactoryUtil.DefaultDynamicTableContext(
+ context.getObjectIdentifier(),
+ context.getCatalogTable().copy(filterLogStoreOptions(options)),
+ filterLogStoreOptions(context.getEnrichmentOptions()),
+ context.getConfiguration(),
+ context.getClassLoader(),
+ context.isTemporary());
+ }
+
+ static Map<String, String> filterLogStoreOptions(Map<String, String>
options) {
+ return options.entrySet().stream()
+ .filter(entry -> !entry.getKey().equals(LOG_SYSTEM.key())) //
exclude log.system
+ .filter(entry -> entry.getKey().startsWith(LOG_PREFIX))
+ .collect(
+ Collectors.toMap(
+ entry ->
entry.getKey().substring(LOG_PREFIX.length()),
+ Map.Entry::getValue));
+ }
+
+ static TableStore buildTableStore(DynamicTableFactory.Context context) {
+ TableStore store =
+ new TableStore(
+ context.getObjectIdentifier(),
+
Configuration.fromMap(context.getCatalogTable().getOptions()));
+
+ Schema schema = store.schema();
+
+ UpdateSchema updateSchema =
UpdateSchema.fromCatalogTable(context.getCatalogTable());
+
+ RowType rowType = updateSchema.rowType();
+ List<String> partitionKeys = updateSchema.partitionKeys();
+ List<String> primaryKeys = updateSchema.primaryKeys();
+
+ // compare fields to ignore isNullable for row type
+ Preconditions.checkArgument(
+
schema.logicalRowType().getFields().equals(rowType.getFields()),
+ "Flink schema and store schema are not the same, "
+ + "store schema is %s, Flink schema is %s",
+ schema.logicalRowType(),
+ rowType);
+
+ Preconditions.checkArgument(
+ schema.partitionKeys().equals(partitionKeys),
+ "Flink partitionKeys and store partitionKeys are not the same,
"
+ + "store partitionKeys is %s, Flink partitionKeys is
%s",
+ schema.partitionKeys(),
+ partitionKeys);
+
+ Preconditions.checkArgument(
+ schema.primaryKeys().equals(primaryKeys),
+ "Flink primaryKeys and store primaryKeys are not the same, "
+ + "store primaryKeys is %s, Flink primaryKeys is %s",
+ schema.primaryKeys(),
+ primaryKeys);
+
+ return store;
+ }
+}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
index 35fd023..3060323 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
@@ -92,7 +92,7 @@ public class TableStore {
this.tableIdentifier = tableIdentifier;
this.options = options;
- Path tablePath = new FileStoreOptions(options).path(tableIdentifier);
+ Path tablePath = FileStoreOptions.path(options);
this.schema =
new SchemaManager(tablePath)
.latest()
@@ -180,7 +180,7 @@ public class TableStore {
FileStoreOptions fileStoreOptions = new FileStoreOptions(options);
return FileStoreImpl.createWithAppendOnly(
- fileStoreOptions.path(tableIdentifier).toString(),
+ fileStoreOptions.path().toString(),
schema.id(),
fileStoreOptions,
user,
@@ -195,7 +195,7 @@ public class TableStore {
if (trimmedPrimaryKeys.length == 0) {
return FileStoreImpl.createWithValueCount(
- fileStoreOptions.path(tableIdentifier).toString(),
+ fileStoreOptions.path().toString(),
schema.id(),
fileStoreOptions,
user,
@@ -203,7 +203,7 @@ public class TableStore {
type);
} else {
return FileStoreImpl.createWithPrimaryKey(
- fileStoreOptions.path(tableIdentifier).toString(),
+ fileStoreOptions.path().toString(),
schema.id(),
fileStoreOptions,
user,
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
deleted file mode 100644
index a61e33e..0000000
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
+++ /dev/null
@@ -1,311 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.DelegatingConfiguration;
-import org.apache.flink.configuration.ExecutionOptions;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.catalog.CatalogPartitionSpec;
-import org.apache.flink.table.factories.DynamicTableSinkFactory;
-import org.apache.flink.table.factories.DynamicTableSourceFactory;
-import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.flink.table.factories.ManagedTableFactory;
-import org.apache.flink.table.store.connector.sink.TableStoreSink;
-import org.apache.flink.table.store.connector.source.TableStoreSource;
-import org.apache.flink.table.store.connector.utils.TableConfigUtils;
-import org.apache.flink.table.store.file.FileStoreOptions;
-import org.apache.flink.table.store.file.WriteMode;
-import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
-import org.apache.flink.table.store.file.schema.Schema;
-import org.apache.flink.table.store.file.schema.SchemaManager;
-import org.apache.flink.table.store.file.schema.UpdateSchema;
-import org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
-import org.apache.flink.table.store.log.LogOptions.LogConsistency;
-import org.apache.flink.table.store.log.LogOptions.LogStartupMode;
-import org.apache.flink.table.store.log.LogStoreTableFactory;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import static
org.apache.flink.table.store.connector.TableStoreFactoryOptions.LOG_SYSTEM;
-import static
org.apache.flink.table.store.connector.TableStoreFactoryOptions.WRITE_MODE;
-import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
-import static
org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
-import static org.apache.flink.table.store.log.LogOptions.CHANGELOG_MODE;
-import static org.apache.flink.table.store.log.LogOptions.CONSISTENCY;
-import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
-import static org.apache.flink.table.store.log.LogOptions.SCAN;
-import static
org.apache.flink.table.store.log.LogStoreTableFactory.discoverLogStoreFactory;
-
-/** Default implementation of {@link ManagedTableFactory}. */
-public class TableStoreFactory
- implements ManagedTableFactory, DynamicTableSourceFactory,
DynamicTableSinkFactory {
-
- @Override
- public Map<String, String> enrichOptions(Context context) {
- Map<String, String> enrichedOptions = new
HashMap<>(context.getCatalogTable().getOptions());
- TableConfigUtils.extractConfiguration(context.getConfiguration())
- .toMap()
- .forEach(
- (k, v) -> {
- if (k.startsWith(TABLE_STORE_PREFIX)) {
- enrichedOptions.putIfAbsent(
-
k.substring(TABLE_STORE_PREFIX.length()), v);
- }
- });
- return enrichedOptions;
- }
-
- @Override
- public void onCreateTable(Context context, boolean ignoreIfExists) {
- Map<String, String> options = context.getCatalogTable().getOptions();
- Path path = FileStoreOptions.path(options,
context.getObjectIdentifier());
- try {
- if (path.getFileSystem().exists(path) && !ignoreIfExists) {
- throw new TableException(
- String.format(
- "Failed to create file store path. "
- + "Reason: directory %s exists for
table %s. "
- + "Suggestion: please try `DESCRIBE
TABLE %s` to "
- + "first check whether table exists in
current catalog. "
- + "If table exists in catalog, and
data files under current path "
- + "are valid, please use `CREATE TABLE
IF NOT EXISTS` ddl instead. "
- + "Otherwise, please choose another
table name "
- + "or manually delete the current path
and try again.",
- path,
-
context.getObjectIdentifier().asSerializableString(),
-
context.getObjectIdentifier().asSerializableString()));
- }
- path.getFileSystem().mkdirs(path);
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
-
- // Cannot define any primary key in an append-only table.
- if
(context.getCatalogTable().getResolvedSchema().getPrimaryKey().isPresent()) {
- if (Objects.equals(
- WriteMode.APPEND_ONLY.toString(),
- options.getOrDefault(WRITE_MODE.key(),
WRITE_MODE.defaultValue().toString()))) {
- throw new TableException(
- "Cannot define any primary key in an append-only
table. Set 'write-mode'='change-log' if "
- + "still want to keep the primary key
definition.");
- }
- }
-
- // update schema
- Path tablePath =
- new FileStoreOptions(context.getCatalogTable().getOptions())
- .path(context.getObjectIdentifier());
- // TODO pass lock
- try {
- new SchemaManager(tablePath)
-
.commitNewVersion(UpdateSchema.fromCatalogTable(context.getCatalogTable()));
- } catch (IllegalStateException e) {
- throw e;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- createOptionalLogStoreFactory(context)
- .ifPresent(
- factory ->
- factory.onCreateTable(
- createLogContext(context),
- Integer.parseInt(
- options.getOrDefault(
- BUCKET.key(),
-
BUCKET.defaultValue().toString())),
- ignoreIfExists));
- }
-
- @Override
- public void onDropTable(Context context, boolean ignoreIfNotExists) {
- Map<String, String> options = context.getCatalogTable().getOptions();
- Path path = FileStoreOptions.path(options,
context.getObjectIdentifier());
- try {
- if (path.getFileSystem().exists(path)) {
- path.getFileSystem().delete(path, true);
- } else if (!ignoreIfNotExists) {
- throw new TableException(
- String.format(
- "Failed to delete file store path. "
- + "Reason: directory %s doesn't exist
for table %s. "
- + "Suggestion: please try `DROP TABLE
IF EXISTS` ddl instead.",
- path,
context.getObjectIdentifier().asSerializableString()));
- }
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- createOptionalLogStoreFactory(context)
- .ifPresent(
- factory ->
- factory.onDropTable(createLogContext(context),
ignoreIfNotExists));
- }
-
- @Override
- public Map<String, String> onCompactTable(
- Context context, CatalogPartitionSpec catalogPartitionSpec) {
- throw new UnsupportedOperationException("Not implement yet");
- }
-
- @Override
- public TableStoreSource createDynamicTableSource(Context context) {
- return new TableStoreSource(
- buildTableStore(context),
- context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
- == RuntimeExecutionMode.STREAMING,
- createLogContext(context),
- createOptionalLogStoreFactory(context).orElse(null));
- }
-
- @Override
- public TableStoreSink createDynamicTableSink(Context context) {
- return new TableStoreSink(
- buildTableStore(context),
- createLogContext(context),
- createOptionalLogStoreFactory(context).orElse(null));
- }
-
- @Override
- public Set<ConfigOption<?>> requiredOptions() {
- return Collections.emptySet();
- }
-
- @Override
- public Set<ConfigOption<?>> optionalOptions() {
- Set<ConfigOption<?>> options = FileStoreOptions.allOptions();
- options.addAll(MergeTreeOptions.allOptions());
- options.addAll(TableStoreFactoryOptions.allOptions());
- return options;
- }
-
- // ~ Tools
------------------------------------------------------------------
-
- private static Optional<LogStoreTableFactory>
createOptionalLogStoreFactory(Context context) {
- Configuration options = new Configuration();
- context.getCatalogTable().getOptions().forEach(options::setString);
-
- if (options.get(LOG_SYSTEM) == null) {
- // Use file store continuous reading
- validateFileStoreContinuous(options);
- return Optional.empty();
- }
-
- return Optional.of(
- discoverLogStoreFactory(context.getClassLoader(),
options.get(LOG_SYSTEM)));
- }
-
- private static void validateFileStoreContinuous(Configuration options) {
- Configuration logOptions = new DelegatingConfiguration(options,
LOG_PREFIX);
- LogChangelogMode changelogMode = logOptions.get(CHANGELOG_MODE);
- if (changelogMode == LogChangelogMode.UPSERT) {
- throw new ValidationException(
- "File store continuous reading dose not support upsert
changelog mode.");
- }
- LogConsistency consistency = logOptions.get(CONSISTENCY);
- if (consistency == LogConsistency.EVENTUAL) {
- throw new ValidationException(
- "File store continuous reading dose not support eventual
consistency mode.");
- }
- LogStartupMode startupMode = logOptions.get(SCAN);
- if (startupMode == LogStartupMode.FROM_TIMESTAMP) {
- throw new ValidationException(
- "File store continuous reading dose not support
from_timestamp scan mode, "
- + "you can add timestamp filters instead.");
- }
- }
-
- private static Context createLogContext(Context context) {
- return new FactoryUtil.DefaultDynamicTableContext(
- context.getObjectIdentifier(),
- context.getCatalogTable()
-
.copy(filterLogStoreOptions(context.getCatalogTable().getOptions())),
- filterLogStoreOptions(context.getEnrichmentOptions()),
- context.getConfiguration(),
- context.getClassLoader(),
- context.isTemporary());
- }
-
- @VisibleForTesting
- static Map<String, String> filterLogStoreOptions(Map<String, String>
options) {
- return options.entrySet().stream()
- .filter(entry -> !entry.getKey().equals(LOG_SYSTEM.key())) //
exclude log.system
- .filter(entry -> entry.getKey().startsWith(LOG_PREFIX))
- .collect(
- Collectors.toMap(
- entry ->
entry.getKey().substring(LOG_PREFIX.length()),
- Map.Entry::getValue));
- }
-
- @VisibleForTesting
- TableStore buildTableStore(Context context) {
- TableStore store =
- new TableStore(
- context.getObjectIdentifier(),
-
Configuration.fromMap(context.getCatalogTable().getOptions()));
-
- Schema schema = store.schema();
-
- UpdateSchema updateSchema =
UpdateSchema.fromCatalogTable(context.getCatalogTable());
-
- RowType rowType = updateSchema.rowType();
- List<String> partitionKeys = updateSchema.partitionKeys();
- List<String> primaryKeys = updateSchema.primaryKeys();
-
- // compare fields to ignore isNullable for row type
- Preconditions.checkArgument(
-
schema.logicalRowType().getFields().equals(rowType.getFields()),
- "Flink schema and store schema are not the same, "
- + "store schema is %s, Flink schema is %s",
- schema.logicalRowType(),
- rowType);
-
- Preconditions.checkArgument(
- schema.partitionKeys().equals(partitionKeys),
- "Flink partitionKeys and store partitionKeys are not the same,
"
- + "store partitionKeys is %s, Flink partitionKeys is
%s",
- schema.partitionKeys(),
- partitionKeys);
-
- Preconditions.checkArgument(
- schema.primaryKeys().equals(primaryKeys),
- "Flink primaryKeys and store primaryKeys are not the same, "
- + "store primaryKeys is %s, Flink primaryKeys is %s",
- schema.primaryKeys(),
- primaryKeys);
-
- return store;
- }
-}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
index 829f99c..2f429cc 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
@@ -30,9 +30,15 @@ import java.util.Set;
import static org.apache.flink.table.store.utils.OptionsUtils.formatEnumOption;
-/** Options for {@link TableStoreFactory}. */
+/** Options for {@link TableStoreManagedFactory}. */
public class TableStoreFactoryOptions {
+ public static final ConfigOption<String> ROOT_PATH =
+ ConfigOptions.key("root-path")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The root file path of the table store in
the filesystem.");
+
public static final ConfigOption<Boolean> COMPACTION_RESCALE_BUCKET =
ConfigOptions.key("compaction.rescale-bucket")
.booleanType()
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
new file mode 100644
index 0000000..6054753
--- /dev/null
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.factories.ManagedTableFactory;
+import org.apache.flink.table.store.connector.utils.TableConfigUtils;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.log.LogStoreTableFactory;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+import static
org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
+import static
org.apache.flink.table.store.connector.TableStoreFactoryOptions.WRITE_MODE;
+import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
+import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
+import static
org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
+import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
+
+/** Default implementation of {@link ManagedTableFactory}. */
+public class TableStoreManagedFactory extends AbstractTableStoreFactory
+ implements ManagedTableFactory {
+
+ @Override
+ public Map<String, String> enrichOptions(Context context) {
+ Map<String, String> enrichedOptions = new
HashMap<>(context.getCatalogTable().getOptions());
+ TableConfigUtils.extractConfiguration(context.getConfiguration())
+ .toMap()
+ .forEach(
+ (k, v) -> {
+ if (k.startsWith(TABLE_STORE_PREFIX)) {
+ enrichedOptions.putIfAbsent(
+
k.substring(TABLE_STORE_PREFIX.length()), v);
+ }
+ });
+
+ String rootPath = enrichedOptions.remove(ROOT_PATH.key());
+ Preconditions.checkArgument(
+ rootPath != null,
+ String.format(
+ "Please specify a root path by setting session level
configuration "
+ + "as `SET 'table-store.%s' = '...'`.",
+ ROOT_PATH.key()));
+
+ Preconditions.checkArgument(
+ !enrichedOptions.containsKey(PATH.key()),
+ "Managed table can not contain table path. "
+ + "You need to remove path in table options or session
config.");
+
+ Path path = new Path(rootPath,
relativeTablePath(context.getObjectIdentifier()));
+ enrichedOptions.put(PATH.key(), path.toString());
+
+ Optional<LogStoreTableFactory> logFactory =
+ createOptionalLogStoreFactory(context.getClassLoader(),
enrichedOptions);
+ logFactory.ifPresent(
+ factory ->
+ factory.enrichOptions(createLogContext(context,
enrichedOptions))
+ .forEach((k, v) ->
enrichedOptions.putIfAbsent(LOG_PREFIX + k, v)));
+
+ return enrichedOptions;
+ }
+
+ @VisibleForTesting
+ static String relativeTablePath(ObjectIdentifier tableIdentifier) {
+ return String.format(
+ "%s.catalog/%s.db/%s",
+ tableIdentifier.getCatalogName(),
+ tableIdentifier.getDatabaseName(),
+ tableIdentifier.getObjectName());
+ }
+
+ @Override
+ public void onCreateTable(Context context, boolean ignoreIfExists) {
+ Map<String, String> options = context.getCatalogTable().getOptions();
+ Path path = FileStoreOptions.path(options);
+ try {
+ if (path.getFileSystem().exists(path) && !ignoreIfExists) {
+ throw new TableException(
+ String.format(
+ "Failed to create file store path. "
+ + "Reason: directory %s exists for
table %s. "
+ + "Suggestion: please try `DESCRIBE
TABLE %s` to "
+ + "first check whether table exists in
current catalog. "
+ + "If table exists in catalog, and
data files under current path "
+ + "are valid, please use `CREATE TABLE
IF NOT EXISTS` ddl instead. "
+ + "Otherwise, please choose another
table name "
+ + "or manually delete the current path
and try again.",
+ path,
+
context.getObjectIdentifier().asSerializableString(),
+
context.getObjectIdentifier().asSerializableString()));
+ }
+ path.getFileSystem().mkdirs(path);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
+ // Cannot define any primary key in an append-only table.
+ if
(context.getCatalogTable().getResolvedSchema().getPrimaryKey().isPresent()) {
+ if (Objects.equals(
+ WriteMode.APPEND_ONLY.toString(),
+ options.getOrDefault(WRITE_MODE.key(),
WRITE_MODE.defaultValue().toString()))) {
+ throw new TableException(
+ "Cannot define any primary key in an append-only
table. Set 'write-mode'='change-log' if "
+ + "still want to keep the primary key
definition.");
+ }
+ }
+
+ // update schema
+ // TODO pass lock
+ try {
+ new SchemaManager(path)
+
.commitNewVersion(UpdateSchema.fromCatalogTable(context.getCatalogTable()));
+ } catch (IllegalStateException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ createOptionalLogStoreFactory(context)
+ .ifPresent(
+ factory ->
+ factory.onCreateTable(
+ createLogContext(context),
+ Integer.parseInt(
+ options.getOrDefault(
+ BUCKET.key(),
+
BUCKET.defaultValue().toString())),
+ ignoreIfExists));
+ }
+
+ @Override
+ public void onDropTable(Context context, boolean ignoreIfNotExists) {
+ Path path =
FileStoreOptions.path(context.getCatalogTable().getOptions());
+ try {
+ if (path.getFileSystem().exists(path)) {
+ path.getFileSystem().delete(path, true);
+ } else if (!ignoreIfNotExists) {
+ throw new TableException(
+ String.format(
+ "Failed to delete file store path. "
+ + "Reason: directory %s doesn't exist
for table %s. "
+ + "Suggestion: please try `DROP TABLE
IF EXISTS` ddl instead.",
+ path,
context.getObjectIdentifier().asSerializableString()));
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ createOptionalLogStoreFactory(context)
+ .ifPresent(
+ factory ->
+ factory.onDropTable(createLogContext(context),
ignoreIfNotExists));
+ }
+
+ @Override
+ public Map<String, String> onCompactTable(
+ Context context, CatalogPartitionSpec catalogPartitionSpec) {
+ throw new UnsupportedOperationException("Not implement yet");
+ }
+}
diff --git
a/flink-table-store-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
b/flink-table-store-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index e377345..726415a 100644
---
a/flink-table-store-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++
b/flink-table-store-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.flink.table.store.connector.TableStoreFactory
+org.apache.flink.table.store.connector.TableStoreManagedFactory
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
index e41f3dc..eb08708 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
@@ -350,7 +350,7 @@ public class FileStoreITCase extends AbstractTestBase {
throws Exception {
ObjectIdentifier identifier = ObjectIdentifier.of("catalog", "db",
"t");
Configuration options = buildConfiguration(noFail,
temporaryFolder.newFolder());
- Path tablePath = new FileStoreOptions(options).path(identifier);
+ Path tablePath = new FileStoreOptions(options).path();
new SchemaManager(tablePath)
.commitNewVersion(
new UpdateSchema(
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
index d10d2a7..59f3d96 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
@@ -36,7 +36,7 @@ import java.time.Duration;
import java.util.List;
import static
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
-import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
+import static
org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
import static
org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
/** ITCase for file store table api. */
@@ -58,7 +58,7 @@ public abstract class FileStoreTableITCase extends
AbstractTestBase {
private void prepareEnv(TableEnvironment env, String path) {
Configuration config = env.getConfig().getConfiguration();
config.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
- config.setString(TABLE_STORE_PREFIX + PATH.key(), path);
+ config.setString(TABLE_STORE_PREFIX + ROOT_PATH.key(), path);
ddl().forEach(env::executeSql);
}
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
index d120223..5c0b913 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
@@ -35,7 +35,6 @@ import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
import
org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
import org.apache.flink.table.store.connector.sink.TableStoreSink;
-import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.utils.BlockingIterator;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
@@ -46,6 +45,7 @@ import org.junit.Test;
import javax.annotation.Nullable;
+import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
@@ -62,6 +62,7 @@ import static
org.apache.flink.table.store.connector.ReadWriteTableTestUtil.dail
import static
org.apache.flink.table.store.connector.ReadWriteTableTestUtil.dailyRatesChangelogWithoutUB;
import static
org.apache.flink.table.store.connector.ReadWriteTableTestUtil.rates;
import static
org.apache.flink.table.store.connector.ShowCreateUtil.buildSimpleSelectQuery;
+import static
org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
import static
org.apache.flink.table.store.connector.TableStoreFactoryOptions.SCAN_PARALLELISM;
import static
org.apache.flink.table.store.connector.TableStoreFactoryOptions.SINK_PARALLELISM;
import static
org.apache.flink.table.store.connector.TableStoreTestBase.createResolvedTable;
@@ -1218,7 +1219,7 @@ public class ReadWriteTableITCase extends
ReadWriteTableTestBase {
id));
tEnv.executeSql(
String.format(
- "create table managed_table with ('path' = '%s') "
+ "create table managed_table with ('root-path' = '%s') "
+ "like dummy_source (excluding options)",
rootPath));
tEnv.executeSql("insert into managed_table select * from
dummy_source").await();
@@ -1258,7 +1259,7 @@ public class ReadWriteTableITCase extends
ReadWriteTableTestBase {
+ "'%s' = '%s'"
+ ") like `helper_source_%s` "
+ "(excluding options)",
- randomSinkId, FileStoreOptions.PATH.key(), rootPath,
randomSourceId));
+ randomSinkId, ROOT_PATH.key(), rootPath,
randomSourceId));
// insert multiple times
tEnv.executeSql(
@@ -1374,7 +1375,7 @@ public class ReadWriteTableITCase extends
ReadWriteTableTestBase {
+ "'%s' = '%s'"
+ ") like `helper_source_%s` "
+ "(excluding options)",
- randomSinkId, FileStoreOptions.PATH.key(), rootPath,
randomSourceId));
+ randomSinkId, ROOT_PATH.key(), rootPath,
randomSourceId));
tEnv.executeSql(
String.format(
"insert into `managed_table_%s` select * from
`helper_source_%s`",
@@ -1421,7 +1422,7 @@ public class ReadWriteTableITCase extends
ReadWriteTableTestBase {
+ " rate BIGINT\n"
+ ") WITH (\n"
+ " 'bucket' = '2',\n"
- + " 'path' = '%s'\n"
+ + " 'root-path' = '%s'\n"
+ ")",
rootPath));
tEnv.executeSql("INSERT INTO rates VALUES('US Dollar', 102)").await();
@@ -1578,13 +1579,14 @@ public class ReadWriteTableITCase extends
ReadWriteTableTestBase {
throws IOException {
// 1. create a mock table sink
Map<String, String> options = new HashMap<>();
- options.put(FileStoreOptions.PATH.key(), "/fake/path");
if (configParallelism != null) {
options.put(SINK_PARALLELISM.key(), configParallelism.toString());
}
options.put(
"path",
-
TEMPORARY_FOLDER.newFolder(UUID.randomUUID().toString()).toURI().toString());
+ new File(TEMPORARY_FOLDER.newFolder(),
UUID.randomUUID().toString())
+ .toURI()
+ .toString());
DynamicTableFactory.Context context =
new FactoryUtil.DefaultDynamicTableContext(
@@ -1600,8 +1602,8 @@ public class ReadWriteTableITCase extends
ReadWriteTableTestBase {
new Configuration(),
Thread.currentThread().getContextClassLoader(),
false);
- new TableStoreFactory().onCreateTable(context, false);
- DynamicTableSink tableSink = new
TableStoreFactory().createDynamicTableSink(context);
+ new TableStoreManagedFactory().onCreateTable(context, false);
+ DynamicTableSink tableSink = new
TableStoreManagedFactory().createDynamicTableSink(context);
assertThat(tableSink).isInstanceOf(TableStoreSink.class);
// 2. get sink provider
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java
index e4cf375..1e233b7 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java
@@ -58,6 +58,7 @@ import static
org.apache.flink.table.store.connector.ShowCreateUtil.buildSelectQ
import static
org.apache.flink.table.store.connector.ShowCreateUtil.buildSimpleSelectQuery;
import static
org.apache.flink.table.store.connector.ShowCreateUtil.createTableLikeDDL;
import static
org.apache.flink.table.store.connector.TableStoreFactoryOptions.LOG_SYSTEM;
+import static
org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
import static
org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
import static org.assertj.core.api.Assertions.assertThat;
@@ -288,7 +289,7 @@ public class ReadWriteTableTestBase extends
KafkaTableTestBase {
throws Exception {
Map<String, String> tableOptions = new HashMap<>();
rootPath = TEMPORARY_FOLDER.newFolder().getPath();
- tableOptions.put(FileStoreOptions.PATH.key(), rootPath);
+ tableOptions.put(ROOT_PATH.key(), rootPath);
if (enableLogStore) {
tableOptions.put(LOG_SYSTEM.key(), "kafka");
tableOptions.put(LOG_PREFIX + BOOTSTRAP_SERVERS.key(),
getBootstrapServers());
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreFactoryTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java
similarity index 73%
rename from
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreFactoryTest.java
rename to
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java
index 021ac98..ed75624 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreFactoryTest.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java
@@ -49,6 +49,9 @@ import java.util.UUID;
import java.util.function.Predicate;
import java.util.stream.Stream;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonMap;
+import static
org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
import static
org.apache.flink.table.store.connector.TableStoreTestBase.createResolvedTable;
import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
@@ -60,13 +63,14 @@ import static
org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-/** Test cases for {@link TableStoreFactory}. */
-public class TableStoreFactoryTest {
+/** Test cases for {@link TableStoreManagedFactory}. */
+public class TableStoreManagedFactoryTest {
private static final ObjectIdentifier TABLE_IDENTIFIER =
ObjectIdentifier.of("catalog", "database", "table");
- private final TableStoreFactory tableStoreFactory = new
TableStoreFactory();
+ private final TableStoreManagedFactory tableStoreManagedFactory =
+ new TableStoreManagedFactory();
@TempDir private static java.nio.file.Path sharedTempDir;
private DynamicTableFactory.Context context;
@@ -78,25 +82,63 @@ public class TableStoreFactoryTest {
Map<String, String> tableOptions,
Map<String, String> expectedEnrichedOptions) {
context = createTableContext(sessionOptions, tableOptions);
- Map<String, String> actualEnrichedOptions =
tableStoreFactory.enrichOptions(context);
+ Map<String, String> actualEnrichedOptions =
tableStoreManagedFactory.enrichOptions(context);
assertThat(actualEnrichedOptions)
.containsExactlyInAnyOrderEntriesOf(expectedEnrichedOptions);
}
+ @Test
+ public void testErrorEnrichOptions() {
+ Map<String, String> sessionMap = new HashMap<>();
+ sessionMap.put("table-store.root-path", "my_path");
+ sessionMap.put("table-store.path", "another_path");
+ context = createTableContext(sessionMap, emptyMap());
+ assertThatThrownBy(() ->
tableStoreManagedFactory.enrichOptions(context))
+ .hasMessage(
+ "Managed table can not contain table path. You need to
remove path in table options or session config.");
+
+ context = createTableContext(emptyMap(), emptyMap());
+ assertThatThrownBy(() ->
tableStoreManagedFactory.enrichOptions(context))
+ .hasMessage(
+ "Please specify a root path by setting session level
configuration as `SET 'table-store.root-path' = '...'`.");
+ }
+
+ @Test
+ public void testEnrichKafkaTopic() {
+ Map<String, String> sessionMap = new HashMap<>();
+ sessionMap.put("table-store.root-path", "my_path");
+ sessionMap.put("table-store.log.system", "kafka");
+ sessionMap.put("table-store.log.topic", "my_topic");
+ context = createTableContext(sessionMap, emptyMap());
+ assertThatThrownBy(() ->
tableStoreManagedFactory.enrichOptions(context))
+ .hasMessage(
+ "Managed table can not contain custom topic. You need
to remove topic in table options or session config.");
+
+ sessionMap.remove("table-store.log.topic");
+ context = createTableContext(sessionMap, emptyMap());
+ Map<String, String> enriched =
tableStoreManagedFactory.enrichOptions(context);
+
+ Map<String, String> expected = new HashMap<>();
+ expected.put("path", "my_path/catalog.catalog/database.db/table");
+ expected.put("log.system", "kafka");
+ expected.put("log.topic", "catalog.database.table");
+ assertThat(enriched).containsExactlyEntriesOf(expected);
+ }
+
@ParameterizedTest
@MethodSource("providingEnrichedOptionsForCreation")
public void testOnCreateTable(Map<String, String> enrichedOptions, boolean
ignoreIfExists) {
- context = createTableContext(Collections.emptyMap(), enrichedOptions);
+ context = enrichContext(createTableContext(emptyMap(),
enrichedOptions));
Path expectedPath =
Paths.get(
sharedTempDir.toAbsolutePath().toString(),
relativeTablePath(TABLE_IDENTIFIER));
boolean exist = expectedPath.toFile().exists();
if (ignoreIfExists || !exist) {
- tableStoreFactory.onCreateTable(context, ignoreIfExists);
+ tableStoreManagedFactory.onCreateTable(context, ignoreIfExists);
assertThat(expectedPath).exists();
} else {
- assertThatThrownBy(() -> tableStoreFactory.onCreateTable(context,
false))
+ assertThatThrownBy(() ->
tableStoreManagedFactory.onCreateTable(context, false))
.isInstanceOf(TableException.class)
.hasMessageContaining(
String.format(
@@ -114,20 +156,32 @@ public class TableStoreFactoryTest {
}
}
+ private DynamicTableFactory.Context
enrichContext(DynamicTableFactory.Context context) {
+ Map<String, String> newOptions =
tableStoreManagedFactory.enrichOptions(context);
+ ResolvedCatalogTable table =
context.getCatalogTable().copy(newOptions);
+ return new FactoryUtil.DefaultDynamicTableContext(
+ context.getObjectIdentifier(),
+ table,
+ emptyMap(),
+ context.getConfiguration(),
+ context.getClassLoader(),
+ context.isTemporary());
+ }
+
@ParameterizedTest
@MethodSource("providingEnrichedOptionsForDrop")
public void testOnDropTable(Map<String, String> enrichedOptions, boolean
ignoreIfNotExists) {
- context = createTableContext(Collections.emptyMap(), enrichedOptions);
+ context = enrichContext(createTableContext(emptyMap(),
enrichedOptions));
Path expectedPath =
Paths.get(
sharedTempDir.toAbsolutePath().toString(),
relativeTablePath(TABLE_IDENTIFIER));
boolean exist = expectedPath.toFile().exists();
if (exist || ignoreIfNotExists) {
- tableStoreFactory.onDropTable(context, ignoreIfNotExists);
+ tableStoreManagedFactory.onDropTable(context, ignoreIfNotExists);
assertThat(expectedPath).doesNotExist();
} else {
- assertThatThrownBy(() -> tableStoreFactory.onDropTable(context,
false))
+ assertThatThrownBy(() ->
tableStoreManagedFactory.onDropTable(context, false))
.isInstanceOf(TableException.class)
.hasMessageContaining(
String.format(
@@ -155,7 +209,7 @@ public class TableStoreFactoryTest {
addPrefix(expectedLogOptions, LOG_PREFIX, (key) -> true);
enrichedOptions.put("foo", "bar");
- assertThat(TableStoreFactory.filterLogStoreOptions(enrichedOptions))
+
assertThat(TableStoreManagedFactory.filterLogStoreOptions(enrichedOptions))
.containsExactlyInAnyOrderEntriesOf(expectedLogOptions);
}
@@ -168,7 +222,7 @@ public class TableStoreFactoryTest {
TableStoreTestBase.ExpectedResult expectedResult) {
ResolvedCatalogTable catalogTable =
createResolvedTable(
- Collections.singletonMap(
+ singletonMap(
"path", sharedTempDir.toAbsolutePath() + "/" +
UUID.randomUUID()),
rowType,
partitions,
@@ -177,13 +231,13 @@ public class TableStoreFactoryTest {
new FactoryUtil.DefaultDynamicTableContext(
TABLE_IDENTIFIER,
catalogTable,
- Collections.emptyMap(),
- Configuration.fromMap(Collections.emptyMap()),
+ emptyMap(),
+ Configuration.fromMap(emptyMap()),
Thread.currentThread().getContextClassLoader(),
false);
if (expectedResult.success) {
- tableStoreFactory.onCreateTable(context, false);
- TableStore tableStore = tableStoreFactory.buildTableStore(context);
+ tableStoreManagedFactory.onCreateTable(context, false);
+ TableStore tableStore =
AbstractTableStoreFactory.buildTableStore(context);
assertThat(tableStore.partitioned()).isEqualTo(catalogTable.isPartitioned());
assertThat(tableStore.valueCountMode())
.isEqualTo(catalogTable.getResolvedSchema().getPrimaryKeyIndexes().length == 0);
@@ -196,7 +250,7 @@ public class TableStoreFactoryTest {
.isTrue();
}
} else {
- assertThatThrownBy(() -> tableStoreFactory.onCreateTable(context,
false))
+ assertThatThrownBy(() ->
tableStoreManagedFactory.onCreateTable(context, false))
.isInstanceOf(expectedResult.expectedType)
.hasMessageContaining(expectedResult.expectedMessage);
}
@@ -209,31 +263,27 @@ public class TableStoreFactoryTest {
of(
BUCKET.key(),
BUCKET.defaultValue().toString(),
- PATH.key(),
+ ROOT_PATH.key(),
sharedTempDir.toString(),
LOG_PREFIX + BOOTSTRAP_SERVERS.key(),
"localhost:9092",
LOG_PREFIX + CONSISTENCY.key(),
CONSISTENCY.defaultValue().name());
- // default
- Arguments arg0 =
- Arguments.of(
- Collections.emptyMap(), Collections.emptyMap(),
Collections.emptyMap());
-
// set configuration under session level
Arguments arg1 =
Arguments.of(
addPrefix(enrichedOptions, TABLE_STORE_PREFIX, (key)
-> true),
- Collections.emptyMap(),
- enrichedOptions);
+ emptyMap(),
+ generateTablePath(enrichedOptions));
// set configuration under table level
- Arguments arg2 = Arguments.of(Collections.emptyMap(), enrichedOptions,
enrichedOptions);
+ Arguments arg2 =
+ Arguments.of(emptyMap(), enrichedOptions,
generateTablePath(enrichedOptions));
// set both session and table level configuration to test options
combination
Map<String, String> tableOptions = new HashMap<>(enrichedOptions);
- tableOptions.remove(PATH.key());
+ tableOptions.remove(ROOT_PATH.key());
tableOptions.remove(CONSISTENCY.key());
Arguments arg3 =
Arguments.of(
@@ -242,19 +292,32 @@ public class TableStoreFactoryTest {
TABLE_STORE_PREFIX,
(key) -> !tableOptions.containsKey(key)),
tableOptions,
- enrichedOptions);
+ generateTablePath(enrichedOptions));
// set same key with different value to test table configuration take
precedence
Map<String, String> sessionOptions = new HashMap<>();
sessionOptions.put(
TABLE_STORE_PREFIX + BUCKET.key(),
String.valueOf(BUCKET.defaultValue() + 1));
- Arguments arg4 = Arguments.of(sessionOptions, enrichedOptions,
enrichedOptions);
- return Stream.of(arg0, arg1, arg2, arg3, arg4);
+
+ Arguments arg4 =
+ Arguments.of(sessionOptions, enrichedOptions,
generateTablePath(enrichedOptions));
+ return Stream.of(arg1, arg2, arg3, arg4);
+ }
+
+ private static Map<String, String> generateTablePath(Map<String, String>
enrichedOptions) {
+ Map<String, String> expected = new HashMap<>(enrichedOptions);
+ String rootPath = expected.remove(ROOT_PATH.key());
+ if (rootPath != null) {
+ String path =
+ rootPath + "/" +
TableStoreManagedFactory.relativeTablePath(TABLE_IDENTIFIER);
+ expected.put(PATH.key(), path);
+ }
+ return expected;
}
private static Stream<Arguments> providingEnrichedOptionsForCreation() {
Map<String, String> enrichedOptions = new HashMap<>();
- enrichedOptions.put(PATH.key(),
sharedTempDir.toAbsolutePath().toString());
+ enrichedOptions.put(ROOT_PATH.key(),
sharedTempDir.toAbsolutePath().toString());
return Stream.of(
Arguments.of(enrichedOptions, false),
Arguments.of(enrichedOptions, true),
@@ -271,7 +334,7 @@ public class TableStoreFactoryTest {
tablePath.mkdirs();
}
Map<String, String> enrichedOptions = new HashMap<>();
- enrichedOptions.put(PATH.key(),
sharedTempDir.toAbsolutePath().toString());
+ enrichedOptions.put(ROOT_PATH.key(),
sharedTempDir.toAbsolutePath().toString());
return Stream.of(
Arguments.of(enrichedOptions, false),
Arguments.of(enrichedOptions, true),
@@ -341,7 +404,7 @@ public class TableStoreFactoryTest {
return new FactoryUtil.DefaultDynamicTableContext(
TABLE_IDENTIFIER,
resolvedCatalogTable,
- Collections.emptyMap(),
+ emptyMap(),
Configuration.fromMap(sessionOptions),
Thread.currentThread().getContextClassLoader(),
false);
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
index c6762da..558f6d2 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
@@ -51,7 +51,7 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static
org.apache.flink.table.store.connector.TableStoreFactoryOptions.LOG_SYSTEM;
-import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
+import static
org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
import static
org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
import static
org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
@@ -109,7 +109,7 @@ public abstract class TableStoreTestBase extends
KafkaTableTestBase {
protected void prepareSessionContext() {
Configuration configuration = tEnv.getConfig().getConfiguration();
- configuration.setString(TABLE_STORE_PREFIX + PATH.key(), rootPath);
+ configuration.setString(TABLE_STORE_PREFIX + ROOT_PATH.key(),
rootPath);
configuration.setString(
TABLE_STORE_PREFIX + LOG_PREFIX + BOOTSTRAP_SERVERS.key(),
getBootstrapServers());
if (enableLogStore) {
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
index df59e66..06dd728 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
@@ -56,7 +56,7 @@ public class FileStoreOptions implements Serializable {
ConfigOptions.key("path")
.stringType()
.noDefaultValue()
- .withDescription("The root file path of the table store in
the filesystem.");
+ .withDescription("The file path of this table in the
filesystem.");
public static final ConfigOption<String> FILE_FORMAT =
ConfigOptions.key("file.format")
@@ -167,21 +167,16 @@ public class FileStoreOptions implements Serializable {
return options.get(BUCKET);
}
- public Path path(ObjectIdentifier tableIdentifier) {
- return path(options.toMap(), tableIdentifier);
+ public Path path() {
+ return path(options.toMap());
}
- public static Path path(Map<String, String> options, ObjectIdentifier
tableIdentifier) {
- Preconditions.checkArgument(
- options.containsKey(PATH.key()),
- String.format(
- "Failed to create file store path. "
- + "Please specify a root dir by setting
session level configuration "
- + "as `SET 'table-store.%s' = '...'`. "
- + "Alternatively, you can use a per-table root
dir "
- + "as `CREATE TABLE ${table} (...) WITH ('%s'
= '...')`",
- PATH.key(), PATH.key()));
- return new Path(options.get(PATH.key()),
relativeTablePath(tableIdentifier));
+ public static Path path(Map<String, String> options) {
+ return new Path(options.get(PATH.key()));
+ }
+
+ public static Path path(Configuration options) {
+ return new Path(options.get(PATH));
}
public static String relativeTablePath(ObjectIdentifier tableIdentifier) {
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
index 5f28a3d..22c4e6c 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
@@ -38,6 +38,8 @@ import org.apache.flink.types.RowKind;
import javax.annotation.Nullable;
+import java.util.Map;
+
/**
* Base interface for configuring a default log table connector. The log table
is used by managed
* table factory.
@@ -47,6 +49,9 @@ import javax.annotation.Nullable;
*/
public interface LogStoreTableFactory extends DynamicTableFactory {
+ /** Enrich options from catalog and session information. */
+ Map<String, String> enrichOptions(Context context);
+
/** Notifies the listener that a table creation occurred. */
void onCreateTable(Context context, int numBucket, boolean ignoreIfExists);
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 6e6bf24..5a1ac1a 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.store.file;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.store.file.data.DataFileMeta;
@@ -112,7 +111,7 @@ public class TestFileStore extends FileStoreImpl {
RowType valueType,
MergeFunction mergeFunction) {
super(
- options.path(ObjectIdentifier.of("catalog", "database",
"table")).toString(),
+ options.path().toString(),
0L,
options,
WriteMode.CHANGE_LOG,
diff --git
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreBatchE2eTest.java
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreBatchE2eTest.java
index 0c6e349..047814c 100644
---
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreBatchE2eTest.java
+++
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreBatchE2eTest.java
@@ -77,7 +77,7 @@ public class FileStoreBatchE2eTest extends E2eTestBase {
+ " price INT\n"
+ ") PARTITIONED BY (dt, hr) WITH (\n"
+ " 'bucket' = '3',\n"
- + " 'path' = '%s'\n"
+ + " 'root-path' = '%s'\n"
+ ");";
tableStoreDdl =
String.format(
diff --git
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreFlinkFormatE2eTest.java
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreFlinkFormatE2eTest.java
index 8953275..0ceee85 100644
---
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreFlinkFormatE2eTest.java
+++
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreFlinkFormatE2eTest.java
@@ -36,7 +36,7 @@ public class FileStoreFlinkFormatE2eTest extends E2eTestBase {
+ " b VARCHAR\n"
+ ") WITH (\n"
+ " 'bucket' = '3',\n"
- + " 'path' = '%s',\n"
+ + " 'root-path' = '%s',\n"
+ " 'file.format' = 'csv'\n"
+ ");";
tableStoreDdl =
diff --git
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreStreamE2eTest.java
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreStreamE2eTest.java
index f37a3fd..44009ec 100644
---
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreStreamE2eTest.java
+++
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreStreamE2eTest.java
@@ -48,7 +48,7 @@ public class FileStoreStreamE2eTest extends E2eTestBase {
+ " rn BIGINT\n"
+ ") WITH (\n"
+ " 'bucket' = '3',\n"
- + " 'path' = '%s'\n"
+ + " 'root-path' = '%s'\n"
+ ");";
tableStoreDdl =
String.format(
diff --git
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/LogStoreE2eTest.java
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/LogStoreE2eTest.java
index e601b28..465b638 100644
---
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/LogStoreE2eTest.java
+++
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/LogStoreE2eTest.java
@@ -38,7 +38,7 @@ public class LogStoreE2eTest extends E2eTestBase {
+ " PRIMARY KEY (k) NOT ENFORCED\n"
+ ") WITH (\n"
+ " 'bucket' = '3',\n"
- + " 'path' = '%s'\n"
+ + " 'root-path' = '%s'\n"
+ ");";
String tableStoreDir = UUID.randomUUID().toString() + ".store";
tableStoreBatchDdl = String.format(tableStoreBatchDdl, TEST_DATA_DIR +
"/" + tableStoreDir);
@@ -71,7 +71,7 @@ public class LogStoreE2eTest extends E2eTestBase {
+ " PRIMARY KEY (k) NOT ENFORCED\n"
+ ") WITH (\n"
+ " 'bucket' = '3',\n"
- + " 'path' = '%s',\n"
+ + " 'root-path' = '%s',\n"
+ " 'log.consistency' = 'eventual',\n"
+ " 'log.system' = 'kafka',\n"
+ " 'log.kafka.bootstrap.servers' = '%s'\n"
diff --git
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/TypeE2eTest.java
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/TypeE2eTest.java
index 81fd14b..2e514b4 100644
---
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/TypeE2eTest.java
+++
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/TypeE2eTest.java
@@ -57,7 +57,7 @@ public class TypeE2eTest extends E2eTestBase {
schema,
") WITH (",
" 'bucket' = '1',",
- " 'path' = '%s'",
+ " 'root-path' = '%s'",
");"));
tableStoreDdl =
String.format(
@@ -128,7 +128,7 @@ public class TypeE2eTest extends E2eTestBase {
"PRIMARY KEY (pk) NOT ENFORCED",
") WITH (",
" 'bucket' = '1',",
- " 'path' = '%s'",
+ " 'root-path' = '%s'",
");"));
tableStoreDdl =
String.format(
diff --git
a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/FileStoreTestHelper.java
b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/FileStoreTestHelper.java
index d6c5234..38071e5 100644
---
a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/FileStoreTestHelper.java
+++
b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/FileStoreTestHelper.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.store;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.FileStoreImpl;
@@ -60,7 +59,6 @@ public class FileStoreTestHelper {
private final ExecutorService compactExecutor;
public FileStoreTestHelper(
- ObjectIdentifier oi,
Configuration conf,
RowType partitionType,
RowType keyType,
@@ -71,7 +69,7 @@ public class FileStoreTestHelper {
FileStoreOptions options = new FileStoreOptions(conf);
this.store =
new FileStoreImpl(
- options.path(oi).toString(),
+ options.path().toString(),
0,
options,
WriteMode.CHANGE_LOG,
diff --git
a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
index 7fe4f30..5f33a37 100644
---
a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
+++
b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.store.hive;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connectors.hive.FlinkEmbeddedHiveRunner;
import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
@@ -83,14 +82,13 @@ public class TableStoreHiveStorageHandlerITCase {
@Test
public void testReadExternalTableWithPk() throws Exception {
- String root = folder.getRoot().getPath();
+ String path = folder.newFolder().toURI().toString();
Configuration conf = new Configuration();
- conf.setString(FileStoreOptions.PATH, root);
+ conf.setString(FileStoreOptions.PATH, path);
conf.setInteger(FileStoreOptions.BUCKET, 2);
conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
FileStoreTestHelper helper =
new FileStoreTestHelper(
- ObjectIdentifier.of("test_catalog", "test_db",
"test_table"),
conf,
RowType.of(),
RowType.of(
@@ -146,9 +144,7 @@ public class TableStoreHiveStorageHandlerITCase {
" c STRING",
")",
"STORED BY '" +
TableStoreHiveStorageHandler.class.getName() + "'",
- "LOCATION '"
- + root
- +
"/test_catalog.catalog/test_db.db/test_table'",
+ "LOCATION '" + path + "'",
"TBLPROPERTIES (",
" 'table-store.catalog' = 'test_catalog',",
" 'table-store.primary-keys' = 'a,b',",
@@ -162,14 +158,13 @@ public class TableStoreHiveStorageHandlerITCase {
@Test
public void testReadExternalTableWithoutPk() throws Exception {
- String root = folder.getRoot().getPath();
+ String path = folder.newFolder().toURI().toString();
Configuration conf = new Configuration();
- conf.setString(FileStoreOptions.PATH, root);
+ conf.setString(FileStoreOptions.PATH, path);
conf.setInteger(FileStoreOptions.BUCKET, 2);
conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
FileStoreTestHelper helper =
new FileStoreTestHelper(
- ObjectIdentifier.of("test_catalog", "test_db",
"test_table"),
conf,
RowType.of(),
RowType.of(
@@ -222,9 +217,7 @@ public class TableStoreHiveStorageHandlerITCase {
" c STRING",
")",
"STORED BY '" +
TableStoreHiveStorageHandler.class.getName() + "'",
- "LOCATION '"
- + root
- +
"/test_catalog.catalog/test_db.db/test_table'",
+ "LOCATION '" + path + "'",
"TBLPROPERTIES (",
" 'table-store.catalog' = 'test_catalog',",
" 'table-store.bucket' = '2',",
@@ -244,7 +237,6 @@ public class TableStoreHiveStorageHandlerITCase {
conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
FileStoreTestHelper helper =
new FileStoreTestHelper(
- ObjectIdentifier.of("test_catalog", "test_db",
"test_table"),
conf,
RowType.of(),
RowType.of(
@@ -300,9 +292,7 @@ public class TableStoreHiveStorageHandlerITCase {
ddl.toString(),
")",
"STORED BY '" +
TableStoreHiveStorageHandler.class.getName() + "'",
- "LOCATION '"
- + root
- +
"/test_catalog.catalog/test_db.db/test_table'",
+ "LOCATION '" + root + "'",
"TBLPROPERTIES (",
" 'table-store.catalog' = 'test_catalog',",
" 'table-store.primary-keys' = 'f_int',",
@@ -385,13 +375,12 @@ public class TableStoreHiveStorageHandlerITCase {
@Test
public void testPredicatePushDown() throws Exception {
- String root = folder.getRoot().getPath();
+ String path = folder.newFolder().toURI().toString();
Configuration conf = new Configuration();
- conf.setString(FileStoreOptions.PATH, root);
+ conf.setString(FileStoreOptions.PATH, path);
conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
FileStoreTestHelper helper =
new FileStoreTestHelper(
- ObjectIdentifier.of("test_catalog", "test_db",
"test_table"),
conf,
RowType.of(),
RowType.of(
@@ -427,9 +416,7 @@ public class TableStoreHiveStorageHandlerITCase {
" a INT",
")",
"STORED BY '" +
TableStoreHiveStorageHandler.class.getName() + "'",
- "LOCATION '"
- + root
- +
"/test_catalog.catalog/test_db.db/test_table'",
+ "LOCATION '" + path + "'",
"TBLPROPERTIES (",
" 'table-store.catalog' = 'test_catalog',",
" 'table-store.file.format' = 'avro'",
@@ -480,13 +467,12 @@ public class TableStoreHiveStorageHandlerITCase {
@Test
public void testDateAndTimestamp() throws Exception {
- String root = folder.getRoot().getPath();
+ String path = folder.newFolder().toURI().toString();
Configuration conf = new Configuration();
- conf.setString(FileStoreOptions.PATH, root);
+ conf.setString(FileStoreOptions.PATH, path);
conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
FileStoreTestHelper helper =
new FileStoreTestHelper(
- ObjectIdentifier.of("test_catalog", "test_db",
"test_table"),
conf,
RowType.of(),
RowType.of(
@@ -532,9 +518,7 @@ public class TableStoreHiveStorageHandlerITCase {
" ts TIMESTAMP",
")",
"STORED BY '" +
TableStoreHiveStorageHandler.class.getName() + "'",
- "LOCATION '"
- + root
- +
"/test_catalog.catalog/test_db.db/test_table'",
+ "LOCATION '" + path + "'",
"TBLPROPERTIES (",
" 'table-store.catalog' = 'test_catalog',",
" 'table-store.file.format' = 'avro'",
diff --git
a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
index b8e1105..01c560c 100644
---
a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
+++
b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.store.mapred;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
@@ -58,7 +57,6 @@ public class TableStoreRecordReaderTest {
conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
FileStoreTestHelper helper =
new FileStoreTestHelper(
- ObjectIdentifier.of("test_catalog", "test_db",
"test_table"),
conf,
RowType.of(),
RowType.of(
@@ -119,7 +117,6 @@ public class TableStoreRecordReaderTest {
conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
FileStoreTestHelper helper =
new FileStoreTestHelper(
- ObjectIdentifier.of("test_catalog", "test_db",
"test_table"),
conf,
RowType.of(),
RowType.of(
diff --git
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogOptions.java
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogOptions.java
index 8882488..8cdb6d0 100644
---
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogOptions.java
+++
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogOptions.java
@@ -29,4 +29,10 @@ public class KafkaLogOptions {
.stringType()
.noDefaultValue()
.withDescription("Required Kafka server connection
string");
+
+ public static final ConfigOption<String> TOPIC =
+ ConfigOptions.key("topic")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Topic of this kafka table.");
}
diff --git
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
index 9904934..9828daf 100644
---
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
+++
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
@@ -31,6 +31,7 @@ import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.store.log.LogStoreTableFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.util.Preconditions;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
@@ -51,6 +52,7 @@ import java.util.concurrent.ExecutionException;
import static
org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;
import static
org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
+import static org.apache.flink.table.store.kafka.KafkaLogOptions.TOPIC;
import static org.apache.flink.table.store.log.LogOptions.CHANGELOG_MODE;
import static org.apache.flink.table.store.log.LogOptions.CONSISTENCY;
import static org.apache.flink.table.store.log.LogOptions.FORMAT;
@@ -84,6 +86,7 @@ public class KafkaLogStoreFactory implements
LogStoreTableFactory {
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(SCAN);
+ options.add(TOPIC);
options.add(SCAN_TIMESTAMP_MILLS);
options.add(RETENTION);
options.add(CONSISTENCY);
@@ -93,6 +96,23 @@ public class KafkaLogStoreFactory implements
LogStoreTableFactory {
return options;
}
+ @Override
+ public Map<String, String> enrichOptions(Context context) {
+ Map<String, String> options = new
HashMap<>(context.getCatalogTable().getOptions());
+ Preconditions.checkArgument(
+ !options.containsKey(TOPIC.key()),
+ "Managed table can not contain custom topic. "
+ + "You need to remove topic in table options or
session config.");
+
+ String topic = context.getObjectIdentifier().asSummaryString();
+ options.put(TOPIC.key(), topic);
+ return options;
+ }
+
+ private String topic(Context context) {
+ return context.getCatalogTable().getOptions().get(TOPIC.key());
+ }
+
@Override
public void onCreateTable(Context context, int numBucket, boolean
ignoreIfExists) {
FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this,
context);
@@ -217,10 +237,6 @@ public class KafkaLogStoreFactory implements
LogStoreTableFactory {
helper.getOptions().get(CHANGELOG_MODE));
}
- private static String topic(DynamicTableFactory.Context context) {
- return context.getObjectIdentifier().asSummaryString();
- }
-
public static Properties toKafkaProperties(ReadableConfig options) {
Properties properties = new Properties();
Map<String, String> optionMap = ((Configuration) options).toMap();
diff --git
a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
index a375d14..4054009 100644
---
a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
+++
b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
@@ -49,12 +49,14 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.apache.flink.table.data.binary.BinaryRowDataUtil.EMPTY_ROW;
import static
org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
import static
org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
+import static org.apache.flink.table.store.kafka.KafkaLogOptions.TOPIC;
import static org.apache.flink.table.store.log.LogOptions.CHANGELOG_MODE;
import static org.apache.flink.table.store.log.LogOptions.CONSISTENCY;
import static org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
@@ -189,6 +191,7 @@ public class KafkaLogTestUtils {
options.put(CHANGELOG_MODE.key(), changelogMode.toString());
options.put(CONSISTENCY.key(), consistency.toString());
options.put(BOOTSTRAP_SERVERS.key(), servers);
+ options.put(TOPIC.key(), UUID.randomUUID().toString());
return createContext(name, type, keys, options);
}