This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ebddb0cd [core] Add streaming read from option (#778) (#812)
6ebddb0cd is described below

commit 6ebddb0cdb1657cbe9249b89124a28627b360ff1
Author: HZY <[email protected]>
AuthorDate: Thu Apr 13 11:13:38 2023 +0800

    [core] Add streaming read from option (#778) (#812)
---
 .../shortcodes/generated/core_configuration.html   |   6 ++
 .../generated/flink_connector_configuration.html   |   2 +-
 .../main/java/org/apache/paimon/CoreOptions.java   |  69 ++++++++++++
 .../java/org/apache/paimon/mergetree/Levels.java   |   2 +-
 .../paimon/flink/ContinuousFileStoreITCase.java    |   8 +-
 .../org/apache/paimon/flink/WatermarkITCase.java   |   2 +-
 .../paimon/flink/ContinuousFileStoreITCase.java    |   8 +-
 .../paimon/flink/AbstractFlinkTableFactory.java    |  12 ++-
 .../apache/paimon/flink/FlinkConnectorOptions.java |   8 +-
 .../paimon/flink/source/DataTableSource.java       |   2 +-
 .../paimon/flink/source/FlinkSourceBuilder.java    |  10 +-
 .../paimon/flink/ContinuousFileStoreITCase.java    |   4 +-
 .../org/apache/paimon/flink/LogSystemITCase.java   | 119 +++++++++++++++++++++
 13 files changed, 232 insertions(+), 20 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 61a430146..b55585981 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -338,6 +338,12 @@
             <td>MemorySize</td>
             <td>Target size of a source split when scanning a bucket.</td>
         </tr>
+        <tr>
+            <td><h5>streaming-read-mode</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td><p>Enum</p></td>
+            <td>The mode of streaming read that specifies to read the data of 
table file or log<br /><br />Possible values:<br /><ul><li>file: Reads from the 
data of table file store.</li></ul><ul><li>log: Read from the data of table log 
store.</li></ul><br /><br />Possible values:<ul><li>"log": Reads from the log 
store.</li><li>"file": Reads from the file store.</li></ul></td>
+        </tr>
         <tr>
             <td><h5>streaming-read-overwrite</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 8a5951dab..85d268eb6 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -18,7 +18,7 @@
             <td><h5>log.system</h5></td>
             <td style="word-wrap: break-word;">"none"</td>
             <td>String</td>
-            <td>The log system used to keep changes of the table.<br /><br 
/>Possible values:<br /><ul><li>"none": No log system, the data is written only 
to file store, and the streaming read will be directly read from the file 
store.</li></ul><ul><li>"kafka": Kafka log system, the data is double written 
to file store and kafka, and the streaming read will be read from 
kafka.</li></ul></td>
+            <td>The log system used to keep changes of the table.<br /><br 
/>Possible values:<br /><ul><li>"none": No log system, the data is written only 
to file store, and the streaming read will be directly read from the file 
store.</li></ul><ul><li>"kafka": Kafka log system, the data is double written 
to file store and kafka, and the streaming read will be read from kafka. If 
streaming read from file, configures streaming-read-mode to file.</li></ul></td>
         </tr>
         <tr>
             <td><h5>scan.infer-parallelism</h5></td>
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index ae188173b..40ac103e3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -568,6 +568,28 @@ public class CoreOptions implements Serializable {
                             "Only used to force TableScan to construct 
'ContinuousCompactorStartingScanner' and "
                                     + "'ContinuousCompactorFollowUpScanner' 
for dedicated streaming compaction job.");
 
+    public static final ConfigOption<StreamingReadMode> STREAMING_READ_MODE =
+            key("streaming-read-mode")
+                    .enumType(StreamingReadMode.class)
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The mode of streaming read that 
specifies to read the data of table file or log")
+                                    .linebreak()
+                                    .linebreak()
+                                    .text("Possible values:")
+                                    .linebreak()
+                                    .list(
+                                            text(
+                                                    
StreamingReadMode.FILE.getValue()
+                                                            + ": Reads from 
the data of table file store."))
+                                    .list(
+                                            text(
+                                                    
StreamingReadMode.LOG.getValue()
+                                                            + ": Read from the 
data of table log store."))
+                                    .build());
+
     private final Options options;
 
     public CoreOptions(Map<String, String> options) {
@@ -822,6 +844,10 @@ public class CoreOptions implements Serializable {
         return options.get(READ_BATCH_SIZE);
     }
 
+    public static StreamingReadMode streamReadType(Options options) {
+        return options.get(STREAMING_READ_MODE);
+    }
+
     /** Specifies the merge engine for table with primary key. */
     public enum MergeEngine implements DescribedEnum {
         DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
@@ -1054,6 +1080,49 @@ public class CoreOptions implements Serializable {
         }
     }
 
+    /** Specifies the type for streaming read. */
+    public enum StreamingReadMode implements DescribedEnum {
+        LOG("log", "Reads from the log store."),
+        FILE("file", "Reads from the file store.");
+
+        private final String value;
+        private final String description;
+
+        StreamingReadMode(String value, String description) {
+            this.value = value;
+            this.description = description;
+        }
+
+        @Override
+        public String toString() {
+            return value;
+        }
+
+        @Override
+        public InlineElement getDescription() {
+            return text(description);
+        }
+
+        public String getValue() {
+            return value;
+        }
+
+        @VisibleForTesting
+        public static StreamingReadMode fromValue(String value) {
+            for (StreamingReadMode formatType : StreamingReadMode.values()) {
+                if (formatType.value.equals(value)) {
+                    return formatType;
+                }
+            }
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Invalid format type %s, only support [%s]",
+                            value,
+                            StringUtils.join(
+                                    
Arrays.stream(StreamingReadMode.values()).iterator(), ",")));
+        }
+    }
+
     /**
      * Set the default values of the {@link CoreOptions} via the given {@link 
Options}.
      *
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/Levels.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/Levels.java
index ca71272bf..b4c16ed67 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/Levels.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/Levels.java
@@ -96,7 +96,7 @@ public class Levels {
     }
 
     public SortedRun runOfLevel(int level) {
-        checkArgument(level > 0, "Level0 dose not have one single sorted 
run.");
+        checkArgument(level > 0, "Level0 does not have one single sorted 
run.");
         return levels.get(level - 1);
     }
 
diff --git 
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
 
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index 40f520683..8314feaa3 100644
--- 
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -469,7 +469,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
                 () ->
                         streamSqlIter(
                                 "SELECT * FROM T1 /*+ 
OPTIONS('log.changelog-mode'='upsert') */"),
-                "File store continuous reading dose not support upsert 
changelog mode");
+                "File store continuous reading does not support upsert 
changelog mode");
     }
 
     @Test
@@ -479,7 +479,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
                 () ->
                         streamSqlIter(
                                 "SELECT * FROM T1 /*+ 
OPTIONS('log.changelog-mode'='upsert') */"),
-                "File store continuous reading dose not support upsert 
changelog mode");
+                "File store continuous reading does not support upsert 
changelog mode");
     }
 
     @Test
@@ -489,7 +489,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
                 () ->
                         streamSqlIter(
                                 "SELECT * FROM T1 /*+ 
OPTIONS('log.consistency'='eventual') */"),
-                "File store continuous reading dose not support eventual 
consistency mode");
+                "File store continuous reading does not support eventual 
consistency mode");
     }
 
     @Test
@@ -499,6 +499,6 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
                 () ->
                         streamSqlIter(
                                 "SELECT * FROM T1 /*+ 
OPTIONS('log.consistency'='eventual') */"),
-                "File store continuous reading dose not support eventual 
consistency mode");
+                "File store continuous reading does not support eventual 
consistency mode");
     }
 }
diff --git 
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/WatermarkITCase.java
 
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/WatermarkITCase.java
index 8247d6dd2..5de32147d 100644
--- 
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/WatermarkITCase.java
+++ 
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/WatermarkITCase.java
@@ -51,7 +51,7 @@ public class WatermarkITCase extends CatalogITCaseBase {
                                         
"'scan.watermark.alignment.update-interval'='2s'",
                                         
"'scan.watermark.alignment.max-drift'='1s',"))
                 .hasMessageContaining(
-                        "Flink 1.14 dose not support watermark alignment, 
please check your Flink version");
+                        "Flink 1.14 does not support watermark alignment, 
please check your Flink version");
     }
 
     private void innerTestWatermark(String... options) throws Exception {
diff --git 
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
 
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index 40f520683..8314feaa3 100644
--- 
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -469,7 +469,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
                 () ->
                         streamSqlIter(
                                 "SELECT * FROM T1 /*+ 
OPTIONS('log.changelog-mode'='upsert') */"),
-                "File store continuous reading dose not support upsert 
changelog mode");
+                "File store continuous reading does not support upsert 
changelog mode");
     }
 
     @Test
@@ -479,7 +479,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
                 () ->
                         streamSqlIter(
                                 "SELECT * FROM T1 /*+ 
OPTIONS('log.changelog-mode'='upsert') */"),
-                "File store continuous reading dose not support upsert 
changelog mode");
+                "File store continuous reading does not support upsert 
changelog mode");
     }
 
     @Test
@@ -489,7 +489,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
                 () ->
                         streamSqlIter(
                                 "SELECT * FROM T1 /*+ 
OPTIONS('log.consistency'='eventual') */"),
-                "File store continuous reading dose not support eventual 
consistency mode");
+                "File store continuous reading does not support eventual 
consistency mode");
     }
 
     @Test
@@ -499,6 +499,6 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
                 () ->
                         streamSqlIter(
                                 "SELECT * FROM T1 /*+ 
OPTIONS('log.consistency'='eventual') */"),
-                "File store continuous reading dose not support eventual 
consistency mode");
+                "File store continuous reading does not support eventual 
consistency mode");
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
index 046289eb9..73a9c0f7c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink;
 
 import org.apache.paimon.CoreOptions.LogChangelogMode;
 import org.apache.paimon.CoreOptions.LogConsistency;
+import org.apache.paimon.CoreOptions.StreamingReadMode;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.flink.log.LogStoreTableFactory;
@@ -53,6 +54,7 @@ import java.util.Set;
 
 import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
 import static org.apache.paimon.CoreOptions.LOG_CONSISTENCY;
+import static org.apache.paimon.CoreOptions.STREAMING_READ_MODE;
 import static org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM;
 import static org.apache.paimon.flink.FlinkConnectorOptions.NONE;
 import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
@@ -123,14 +125,19 @@ public abstract class AbstractFlinkTableFactory
 
     private static void validateFileStoreContinuous(Options options) {
         LogChangelogMode changelogMode = options.get(LOG_CHANGELOG_MODE);
+        StreamingReadMode streamingReadMode = options.get(STREAMING_READ_MODE);
         if (changelogMode == LogChangelogMode.UPSERT) {
             throw new ValidationException(
-                    "File store continuous reading dose not support upsert 
changelog mode.");
+                    "File store continuous reading does not support upsert 
changelog mode.");
         }
         LogConsistency consistency = options.get(LOG_CONSISTENCY);
         if (consistency == LogConsistency.EVENTUAL) {
             throw new ValidationException(
-                    "File store continuous reading dose not support eventual 
consistency mode.");
+                    "File store continuous reading does not support eventual 
consistency mode.");
+        }
+        if (streamingReadMode == StreamingReadMode.LOG) {
+            throw new ValidationException(
+                    "File store continuous reading does not support the log 
streaming read mode.");
         }
     }
 
@@ -142,6 +149,7 @@ public abstract class AbstractFlinkTableFactory
     static Table buildPaimonTable(DynamicTableFactory.Context context) {
         CatalogTable origin = context.getCatalogTable().getOrigin();
         Table table;
+
         if (origin instanceof DataCatalogTable) {
             table = ((DataCatalogTable) 
origin).table().copy(origin.getOptions());
         } else {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index f6b0f1110..ddf85c424 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.CoreOptions.StreamingReadMode;
 import org.apache.paimon.annotation.Documentation.ExcludeFromDocumentation;
 import org.apache.paimon.options.ConfigOption;
 import org.apache.paimon.options.ConfigOptions;
@@ -32,6 +33,7 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.apache.paimon.CoreOptions.STREAMING_READ_MODE;
 import static org.apache.paimon.options.ConfigOptions.key;
 import static org.apache.paimon.options.description.TextElement.text;
 
@@ -58,7 +60,11 @@ public class FlinkConnectorOptions {
                                     .list(
                                             TextElement.text(
                                                     "\"kafka\": Kafka log 
system, the data is double written to file"
-                                                            + " store and 
kafka, and the streaming read will be read from kafka."))
+                                                            + " store and 
kafka, and the streaming read will be read from kafka. If streaming read from 
file, configures "
+                                                            + 
STREAMING_READ_MODE.key()
+                                                            + " to "
+                                                            + 
StreamingReadMode.FILE.getValue()
+                                                            + "."))
                                     .build());
 
     public static final ConfigOption<Integer> SINK_PARALLELISM =
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index e4b00e7cf..b24967965 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -189,7 +189,7 @@ public class DataTableSource extends FlinkTableSource
                                     
options.get(SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL));
                 } catch (NoSuchMethodError error) {
                     throw new RuntimeException(
-                            "Flink 1.14 dose not support watermark alignment, 
please check your Flink version.",
+                            "Flink 1.14 does not support watermark alignment, 
please check your Flink version.",
                             error);
                 }
             }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index 17aa346d9..0895c7389 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.source;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.CoreOptions.StartupMode;
+import org.apache.paimon.CoreOptions.StreamingReadMode;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.Projection;
 import org.apache.paimon.flink.log.LogSourceProvider;
@@ -46,6 +47,7 @@ import javax.annotation.Nullable;
 import java.util.List;
 import java.util.Optional;
 
+import static org.apache.paimon.CoreOptions.StreamingReadMode.FILE;
 import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
 
 /**
@@ -142,9 +144,9 @@ public class FlinkSourceBuilder {
 
             // TODO visit all options through CoreOptions
             StartupMode startupMode = CoreOptions.startupMode(conf);
-            if (logSourceProvider == null) {
-                return buildContinuousFileSource();
-            } else {
+            StreamingReadMode streamingReadMode = 
CoreOptions.streamReadType(conf);
+
+            if (logSourceProvider != null && streamingReadMode != FILE) {
                 if (startupMode != StartupMode.LATEST_FULL) {
                     return logSourceProvider.createSource(null);
                 }
@@ -155,6 +157,8 @@ public class FlinkSourceBuilder {
                                 new LogHybridSourceFactory(logSourceProvider),
                                 Boundedness.CONTINUOUS_UNBOUNDED)
                         .build();
+            } else {
+                return buildContinuousFileSource();
             }
         } else {
             return buildStaticFileSource();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index a9eee15b2..163344217 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -286,7 +286,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
                                         "SELECT * FROM T1 /*+ 
OPTIONS('log.changelog-mode'='upsert') */"))
                 .hasCauseInstanceOf(ValidationException.class)
                 .hasRootCauseMessage(
-                        "File store continuous reading dose not support upsert 
changelog mode.");
+                        "File store continuous reading does not support upsert 
changelog mode.");
     }
 
     @TestTemplate
@@ -297,6 +297,6 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
                                         "SELECT * FROM T1 /*+ 
OPTIONS('log.consistency'='eventual') */"))
                 .hasCauseInstanceOf(ValidationException.class)
                 .hasRootCauseMessage(
-                        "File store continuous reading dose not support 
eventual consistency mode.");
+                        "File store continuous reading does not support 
eventual consistency mode.");
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LogSystemITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LogSystemITCase.java
index e58c779a9..829d6744d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LogSystemITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LogSystemITCase.java
@@ -22,6 +22,7 @@ import org.apache.paimon.flink.kafka.KafkaTableTestBase;
 import org.apache.paimon.utils.BlockingIterator;
 
 import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.types.Row;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -30,6 +31,7 @@ import java.io.IOException;
 import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** ITCase for table with log system. */
 public class LogSystemITCase extends KafkaTableTestBase {
@@ -67,4 +69,121 @@ public class LogSystemITCase extends KafkaTableTestBase {
         write.getJobClient().get().cancel();
         read.close();
     }
+
+    @Test
+    public void testReadFromFile() throws Exception {
+        createTopicIfNotExists("test-double-sink", 1);
+        env.getCheckpointConfig().setCheckpointInterval(3 * 1000);
+        env.setParallelism(1);
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE kafka_file_double_sink (\n"
+                                + " word STRING ,\n"
+                                + "    cnt BIGINT,\n"
+                                + "      PRIMARY KEY (word) NOT ENFORCED\n"
+                                + ")\n"
+                                + "WITH (\n"
+                                + " 'merge-engine' = 'aggregation',\n"
+                                + "  'changelog-producer' = 
'full-compaction',\n"
+                                + "    'log.system' = 'kafka',\n"
+                                + "    'streaming-read-mode'='file',\n"
+                                + "    'fields.cnt.aggregate-function' = 
'sum',\n"
+                                + "    'kafka.bootstrap.servers' = '%s',\n"
+                                + "    'kafka.topic' = 'test-double-sink',\n"
+                                + "    
'kafka.transaction.timeout.ms'='30000'\n"
+                                + "\n"
+                                + ");",
+                        getBootstrapServers()));
+        TableResult write =
+                tEnv.executeSql(
+                        "INSERT INTO kafka_file_double_sink 
values('a',1),('b',2),('c',3);");
+        BlockingIterator<Row, Row> read =
+                BlockingIterator.of(
+                        tEnv.executeSql("SELECT * FROM 
kafka_file_double_sink").collect());
+        assertThat(read.collect(3))
+                .containsExactlyInAnyOrder(Row.of("a", 1L), Row.of("b", 2L), 
Row.of("c", 3L));
+        write.getJobClient().get().cancel();
+        read.close();
+    }
+
+    @Test
+    public void testReadFromLog() throws Exception {
+        createTopicIfNotExists("test-single-sink", 1);
+        // disable checkpointing to test eventual
+        env.getCheckpointConfig().disableCheckpointing();
+        env.setParallelism(1);
+        // 'fields.cnt.aggregate-function' = 'sum' is miss will throw
+        // java.lang.UnsupportedOperationException: Aggregate function 
'last_non_null_value' does
+        // not support retraction
+        // data will only be written to kafka
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE kafka_file_single_sink (\n"
+                                + " word STRING ,\n"
+                                + "    cnt BIGINT,\n"
+                                + "      PRIMARY KEY (word) NOT ENFORCED\n"
+                                + ")\n"
+                                + "WITH (\n"
+                                + " 'merge-engine' = 'aggregation',\n"
+                                + "    'changelog-producer' = 
'full-compaction',\n"
+                                + "    'log.consistency' = 'eventual',\n"
+                                + "    'log.system' = 'kafka',\n"
+                                + "    'streaming-read-mode'='log',\n"
+                                + "    'kafka.bootstrap.servers' = '%s',\n"
+                                + "    'kafka.topic' = 'test-single-sink',\n"
+                                + "    
'kafka.transaction.timeout.ms'='30000'\n"
+                                + "\n"
+                                + ");",
+                        getBootstrapServers()));
+        tEnv.executeSql(
+                "CREATE TEMPORARY TABLE word_table (\n"
+                        + "    word STRING\n"
+                        + ") WITH (\n"
+                        + "    'connector' = 'datagen',\n"
+                        + "    'fields.word.length' = '1'\n"
+                        + ");");
+        TableResult write =
+                tEnv.executeSql(
+                        "INSERT INTO kafka_file_single_sink SELECT word, 
COUNT(*) FROM word_table GROUP BY word;");
+        BlockingIterator<Row, Row> read =
+                BlockingIterator.of(
+                        tEnv.executeSql("SELECT * FROM 
kafka_file_single_sink").collect());
+        List<Row> collect = read.collect(10);
+        assertThat(collect).hasSize(10);
+        write.getJobClient().get().cancel();
+        read.close();
+    }
+
+    @Test
+    public void testReadFromLogWithOutSteamingReadMode() throws Exception {
+        createTopicIfNotExists("test-single-sink", 1);
+        env.setParallelism(1);
+
+        tEnv.executeSql(
+                "CREATE TABLE kafka_file_single_sink (\n"
+                        + " word STRING ,\n"
+                        + "    cnt BIGINT,\n"
+                        + "      PRIMARY KEY (word) NOT ENFORCED\n"
+                        + ")\n"
+                        + "WITH (\n"
+                        + " 'merge-engine' = 'aggregation',\n"
+                        + "    'changelog-producer' = 'full-compaction',\n"
+                        + "    'streaming-read-mode'='log'\n"
+                        + ");");
+        tEnv.executeSql(
+                "CREATE TEMPORARY TABLE word_table (\n"
+                        + "    word STRING\n"
+                        + ") WITH (\n"
+                        + "    'connector' = 'datagen',\n"
+                        + "    'fields.word.length' = '1'\n"
+                        + ");");
+        assertThatThrownBy(
+                        () ->
+                                tEnv.executeSql(
+                                        "INSERT INTO kafka_file_single_sink 
SELECT word, COUNT(*) FROM word_table GROUP BY word;"))
+                .getRootCause()
+                .isInstanceOf(ValidationException.class)
+                .hasMessage(
+                        "File store continuous reading does not support the 
log streaming read mode.");
+    }
 }

Reply via email to