This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/release-0.2 by this push:
     new 7d6be436 [FLINK-29181] log.system can be congiured by dynamic options
7d6be436 is described below

commit 7d6be436f74a70ba7446986c1163b9b3f98c204c
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Sep 2 15:45:17 2022 +0800

    [FLINK-29181] log.system can be congiured by dynamic options
    
    This closes #285
---
 .../generated/flink_connector_configuration.html   |  4 ++--
 .../store/connector/AbstractTableStoreFactory.java |  3 ++-
 .../store/connector/FlinkConnectorOptions.java     | 23 ++++++++++++++++++++--
 .../table/store/connector/TableStoreTestBase.java  |  3 +++
 4 files changed, 28 insertions(+), 5 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 30a9a69e..d5d40b36 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -10,9 +10,9 @@
     <tbody>
         <tr>
             <td><h5>log.system</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
+            <td style="word-wrap: break-word;">"none"</td>
             <td>String</td>
-            <td>The log system used to keep changes of the table.</td>
+            <td>The log system used to keep changes of the table.<br /><br 
/>Possible values:<br /><ul><li>"none": No log system, the data is written only 
to file store, and the streaming read will be directly read from the file 
store.</li></ul><ul><li>"kafka": Kafka log system, the data is double written 
to file store and kafka, and the streaming read will be read from 
kafka.</li></ul></td>
         </tr>
         <tr>
             <td><h5>scan.parallelism</h5></td>
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
index 566686e4..7c24cc42 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
@@ -54,6 +54,7 @@ import static 
org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE;
 import static org.apache.flink.table.store.CoreOptions.LOG_CONSISTENCY;
 import static org.apache.flink.table.store.CoreOptions.LOG_SCAN;
 import static 
org.apache.flink.table.store.connector.FlinkConnectorOptions.LOG_SYSTEM;
+import static 
org.apache.flink.table.store.connector.FlinkConnectorOptions.NONE;
 import static 
org.apache.flink.table.store.log.LogStoreTableFactory.discoverLogStoreFactory;
 
 /** Abstract table store factory to create table source and table sink. */
@@ -106,7 +107,7 @@ public abstract class AbstractTableStoreFactory
         Configuration configOptions = new Configuration();
         options.forEach(configOptions::setString);
 
-        if (configOptions.get(LOG_SYSTEM) == null) {
+        if (configOptions.get(LOG_SYSTEM).equalsIgnoreCase(NONE)) {
             // Use file store continuous reading
             validateFileStoreContinuous(configOptions);
             return Optional.empty();
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java
index a49a7b27..07b21581 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java
@@ -22,6 +22,8 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.docs.Documentation;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.configuration.description.TextElement;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.factories.FactoryUtil;
 
@@ -34,6 +36,8 @@ public class FlinkConnectorOptions {
 
     public static final String TABLE_STORE_PREFIX = "table-store.";
 
+    public static final String NONE = "none";
+
     @Internal
     @Documentation.ExcludeFromDocumentation("Internal use only")
     public static final ConfigOption<String> ROOT_PATH =
@@ -63,8 +67,23 @@ public class FlinkConnectorOptions {
     public static final ConfigOption<String> LOG_SYSTEM =
             ConfigOptions.key("log.system")
                     .stringType()
-                    .noDefaultValue()
-                    .withDescription("The log system used to keep changes of 
the table.");
+                    .defaultValue(NONE)
+                    .withDescription(
+                            Description.builder()
+                                    .text("The log system used to keep changes 
of the table.")
+                                    .linebreak()
+                                    .linebreak()
+                                    .text("Possible values:")
+                                    .linebreak()
+                                    .list(
+                                            TextElement.text(
+                                                    "\"none\": No log system, 
the data is written only to file store,"
+                                                            + " and the 
streaming read will be directly read from the file store."))
+                                    .list(
+                                            TextElement.text(
+                                                    "\"kafka\": Kafka log 
system, the data is double written to file"
+                                                            + " store and 
kafka, and the streaming read will be read from kafka."))
+                                    .build());
 
     public static final ConfigOption<Integer> SINK_PARALLELISM = 
FactoryUtil.SINK_PARALLELISM;
 
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
index b3284952..f8728183 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
@@ -50,6 +50,7 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static 
org.apache.flink.table.store.connector.FlinkConnectorOptions.LOG_SYSTEM;
+import static 
org.apache.flink.table.store.connector.FlinkConnectorOptions.NONE;
 import static 
org.apache.flink.table.store.connector.FlinkConnectorOptions.ROOT_PATH;
 import static 
org.apache.flink.table.store.connector.FlinkConnectorOptions.TABLE_STORE_PREFIX;
 import static 
org.apache.flink.table.store.connector.FlinkConnectorOptions.relativeTablePath;
@@ -113,6 +114,8 @@ public abstract class TableStoreTestBase extends 
KafkaTableTestBase {
                 TABLE_STORE_PREFIX + BOOTSTRAP_SERVERS.key(), 
getBootstrapServers());
         if (enableLogStore) {
             configuration.setString(TABLE_STORE_PREFIX + LOG_SYSTEM.key(), 
"kafka");
+        } else {
+            configuration.setString(TABLE_STORE_PREFIX + LOG_SYSTEM.key(), 
NONE);
         }
     }
 

Reply via email to