This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/release-0.2 by this push:
new 7d6be436 [FLINK-29181] log.system can be congiured by dynamic options
7d6be436 is described below
commit 7d6be436f74a70ba7446986c1163b9b3f98c204c
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Sep 2 15:45:17 2022 +0800
[FLINK-29181] log.system can be congiured by dynamic options
This closes #285
---
.../generated/flink_connector_configuration.html | 4 ++--
.../store/connector/AbstractTableStoreFactory.java | 3 ++-
.../store/connector/FlinkConnectorOptions.java | 23 ++++++++++++++++++++--
.../table/store/connector/TableStoreTestBase.java | 3 +++
4 files changed, 28 insertions(+), 5 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 30a9a69e..d5d40b36 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -10,9 +10,9 @@
<tbody>
<tr>
<td><h5>log.system</h5></td>
- <td style="word-wrap: break-word;">(none)</td>
+ <td style="word-wrap: break-word;">"none"</td>
<td>String</td>
- <td>The log system used to keep changes of the table.</td>
+ <td>The log system used to keep changes of the table.<br /><br
/>Possible values:<br /><ul><li>"none": No log system, the data is written only
to file store, and the streaming read will be directly read from the file
store.</li></ul><ul><li>"kafka": Kafka log system, the data is double written
to file store and kafka, and the streaming read will be read from
kafka.</li></ul></td>
</tr>
<tr>
<td><h5>scan.parallelism</h5></td>
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
index 566686e4..7c24cc42 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
@@ -54,6 +54,7 @@ import static
org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE;
import static org.apache.flink.table.store.CoreOptions.LOG_CONSISTENCY;
import static org.apache.flink.table.store.CoreOptions.LOG_SCAN;
import static
org.apache.flink.table.store.connector.FlinkConnectorOptions.LOG_SYSTEM;
+import static
org.apache.flink.table.store.connector.FlinkConnectorOptions.NONE;
import static
org.apache.flink.table.store.log.LogStoreTableFactory.discoverLogStoreFactory;
/** Abstract table store factory to create table source and table sink. */
@@ -106,7 +107,7 @@ public abstract class AbstractTableStoreFactory
Configuration configOptions = new Configuration();
options.forEach(configOptions::setString);
- if (configOptions.get(LOG_SYSTEM) == null) {
+ if (configOptions.get(LOG_SYSTEM).equalsIgnoreCase(NONE)) {
// Use file store continuous reading
validateFileStoreContinuous(configOptions);
return Optional.empty();
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java
index a49a7b27..07b21581 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java
@@ -22,6 +22,8 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.docs.Documentation;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.configuration.description.TextElement;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.factories.FactoryUtil;
@@ -34,6 +36,8 @@ public class FlinkConnectorOptions {
public static final String TABLE_STORE_PREFIX = "table-store.";
+ public static final String NONE = "none";
+
@Internal
@Documentation.ExcludeFromDocumentation("Internal use only")
public static final ConfigOption<String> ROOT_PATH =
@@ -63,8 +67,23 @@ public class FlinkConnectorOptions {
public static final ConfigOption<String> LOG_SYSTEM =
ConfigOptions.key("log.system")
.stringType()
- .noDefaultValue()
- .withDescription("The log system used to keep changes of
the table.");
+ .defaultValue(NONE)
+ .withDescription(
+ Description.builder()
+ .text("The log system used to keep changes
of the table.")
+ .linebreak()
+ .linebreak()
+ .text("Possible values:")
+ .linebreak()
+ .list(
+ TextElement.text(
+ "\"none\": No log system,
the data is written only to file store,"
+ + " and the
streaming read will be directly read from the file store."))
+ .list(
+ TextElement.text(
+ "\"kafka\": Kafka log
system, the data is double written to file"
+ + " store and
kafka, and the streaming read will be read from kafka."))
+ .build());
public static final ConfigOption<Integer> SINK_PARALLELISM =
FactoryUtil.SINK_PARALLELISM;
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
index b3284952..f8728183 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
@@ -50,6 +50,7 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static
org.apache.flink.table.store.connector.FlinkConnectorOptions.LOG_SYSTEM;
+import static
org.apache.flink.table.store.connector.FlinkConnectorOptions.NONE;
import static
org.apache.flink.table.store.connector.FlinkConnectorOptions.ROOT_PATH;
import static
org.apache.flink.table.store.connector.FlinkConnectorOptions.TABLE_STORE_PREFIX;
import static
org.apache.flink.table.store.connector.FlinkConnectorOptions.relativeTablePath;
@@ -113,6 +114,8 @@ public abstract class TableStoreTestBase extends
KafkaTableTestBase {
TABLE_STORE_PREFIX + BOOTSTRAP_SERVERS.key(),
getBootstrapServers());
if (enableLogStore) {
configuration.setString(TABLE_STORE_PREFIX + LOG_SYSTEM.key(),
"kafka");
+ } else {
+ configuration.setString(TABLE_STORE_PREFIX + LOG_SYSTEM.key(),
NONE);
}
}