This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6ebddb0cd [core] Add streaming read from option (#778) (#812)
6ebddb0cd is described below
commit 6ebddb0cdb1657cbe9249b89124a28627b360ff1
Author: HZY <[email protected]>
AuthorDate: Thu Apr 13 11:13:38 2023 +0800
[core] Add streaming read from option (#778) (#812)
---
.../shortcodes/generated/core_configuration.html | 6 ++
.../generated/flink_connector_configuration.html | 2 +-
.../main/java/org/apache/paimon/CoreOptions.java | 69 ++++++++++++
.../java/org/apache/paimon/mergetree/Levels.java | 2 +-
.../paimon/flink/ContinuousFileStoreITCase.java | 8 +-
.../org/apache/paimon/flink/WatermarkITCase.java | 2 +-
.../paimon/flink/ContinuousFileStoreITCase.java | 8 +-
.../paimon/flink/AbstractFlinkTableFactory.java | 12 ++-
.../apache/paimon/flink/FlinkConnectorOptions.java | 8 +-
.../paimon/flink/source/DataTableSource.java | 2 +-
.../paimon/flink/source/FlinkSourceBuilder.java | 10 +-
.../paimon/flink/ContinuousFileStoreITCase.java | 4 +-
.../org/apache/paimon/flink/LogSystemITCase.java | 119 +++++++++++++++++++++
13 files changed, 232 insertions(+), 20 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 61a430146..b55585981 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -338,6 +338,12 @@
<td>MemorySize</td>
<td>Target size of a source split when scanning a bucket.</td>
</tr>
+ <tr>
+ <td><h5>streaming-read-mode</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td><p>Enum</p></td>
+ <td>The mode of streaming read that specifies to read the data of
table file or log<br /><br />Possible values:<br /><ul><li>file: Reads from the
data of table file store.</li></ul><ul><li>log: Read from the data of table log
store.</li></ul><br /><br />Possible values:<ul><li>"log": Reads from the log
store.</li><li>"file": Reads from the file store.</li></ul></td>
+ </tr>
<tr>
<td><h5>streaming-read-overwrite</h5></td>
<td style="word-wrap: break-word;">false</td>
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 8a5951dab..85d268eb6 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -18,7 +18,7 @@
<td><h5>log.system</h5></td>
<td style="word-wrap: break-word;">"none"</td>
<td>String</td>
- <td>The log system used to keep changes of the table.<br /><br
/>Possible values:<br /><ul><li>"none": No log system, the data is written only
to file store, and the streaming read will be directly read from the file
store.</li></ul><ul><li>"kafka": Kafka log system, the data is double written
to file store and kafka, and the streaming read will be read from
kafka.</li></ul></td>
+ <td>The log system used to keep changes of the table.<br /><br
/>Possible values:<br /><ul><li>"none": No log system, the data is written only
to file store, and the streaming read will be directly read from the file
store.</li></ul><ul><li>"kafka": Kafka log system, the data is double written
to file store and kafka, and the streaming read will be read from kafka. If
streaming read from file, configures streaming-read-mode to file.</li></ul></td>
</tr>
<tr>
<td><h5>scan.infer-parallelism</h5></td>
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index ae188173b..40ac103e3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -568,6 +568,28 @@ public class CoreOptions implements Serializable {
"Only used to force TableScan to construct
'ContinuousCompactorStartingScanner' and "
+ "'ContinuousCompactorFollowUpScanner'
for dedicated streaming compaction job.");
+ public static final ConfigOption<StreamingReadMode> STREAMING_READ_MODE =
+ key("streaming-read-mode")
+ .enumType(StreamingReadMode.class)
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "The mode of streaming read that
specifies to read the data of table file or log")
+ .linebreak()
+ .linebreak()
+ .text("Possible values:")
+ .linebreak()
+ .list(
+ text(
+
StreamingReadMode.FILE.getValue()
+ + ": Reads from
the data of table file store."))
+ .list(
+ text(
+
StreamingReadMode.LOG.getValue()
+ + ": Read from the
data of table log store."))
+ .build());
+
private final Options options;
public CoreOptions(Map<String, String> options) {
@@ -822,6 +844,10 @@ public class CoreOptions implements Serializable {
return options.get(READ_BATCH_SIZE);
}
+ public static StreamingReadMode streamReadType(Options options) {
+ return options.get(STREAMING_READ_MODE);
+ }
+
/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
@@ -1054,6 +1080,49 @@ public class CoreOptions implements Serializable {
}
}
+ /** Specifies the type for streaming read. */
+ public enum StreamingReadMode implements DescribedEnum {
+ LOG("log", "Reads from the log store."),
+ FILE("file", "Reads from the file store.");
+
+ private final String value;
+ private final String description;
+
+ StreamingReadMode(String value, String description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ @Override
+ public InlineElement getDescription() {
+ return text(description);
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ @VisibleForTesting
+ public static StreamingReadMode fromValue(String value) {
+ for (StreamingReadMode formatType : StreamingReadMode.values()) {
+ if (formatType.value.equals(value)) {
+ return formatType;
+ }
+ }
+ throw new IllegalArgumentException(
+ String.format(
+ "Invalid format type %s, only support [%s]",
+ value,
+ StringUtils.join(
+
Arrays.stream(StreamingReadMode.values()).iterator(), ",")));
+ }
+ }
+
/**
* Set the default values of the {@link CoreOptions} via the given {@link
Options}.
*
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/Levels.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/Levels.java
index ca71272bf..b4c16ed67 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/Levels.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/Levels.java
@@ -96,7 +96,7 @@ public class Levels {
}
public SortedRun runOfLevel(int level) {
- checkArgument(level > 0, "Level0 dose not have one single sorted
run.");
+ checkArgument(level > 0, "Level0 does not have one single sorted
run.");
return levels.get(level - 1);
}
diff --git
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index 40f520683..8314feaa3 100644
---
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -469,7 +469,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
() ->
streamSqlIter(
"SELECT * FROM T1 /*+
OPTIONS('log.changelog-mode'='upsert') */"),
- "File store continuous reading dose not support upsert
changelog mode");
+ "File store continuous reading does not support upsert
changelog mode");
}
@Test
@@ -479,7 +479,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
() ->
streamSqlIter(
"SELECT * FROM T1 /*+
OPTIONS('log.changelog-mode'='upsert') */"),
- "File store continuous reading dose not support upsert
changelog mode");
+ "File store continuous reading does not support upsert
changelog mode");
}
@Test
@@ -489,7 +489,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
() ->
streamSqlIter(
"SELECT * FROM T1 /*+
OPTIONS('log.consistency'='eventual') */"),
- "File store continuous reading dose not support eventual
consistency mode");
+ "File store continuous reading does not support eventual
consistency mode");
}
@Test
@@ -499,6 +499,6 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
() ->
streamSqlIter(
"SELECT * FROM T1 /*+
OPTIONS('log.consistency'='eventual') */"),
- "File store continuous reading dose not support eventual
consistency mode");
+ "File store continuous reading does not support eventual
consistency mode");
}
}
diff --git
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/WatermarkITCase.java
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/WatermarkITCase.java
index 8247d6dd2..5de32147d 100644
---
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/WatermarkITCase.java
+++
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/WatermarkITCase.java
@@ -51,7 +51,7 @@ public class WatermarkITCase extends CatalogITCaseBase {
"'scan.watermark.alignment.update-interval'='2s'",
"'scan.watermark.alignment.max-drift'='1s',"))
.hasMessageContaining(
- "Flink 1.14 dose not support watermark alignment,
please check your Flink version");
+ "Flink 1.14 does not support watermark alignment,
please check your Flink version");
}
private void innerTestWatermark(String... options) throws Exception {
diff --git
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index 40f520683..8314feaa3 100644
---
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -469,7 +469,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
() ->
streamSqlIter(
"SELECT * FROM T1 /*+
OPTIONS('log.changelog-mode'='upsert') */"),
- "File store continuous reading dose not support upsert
changelog mode");
+ "File store continuous reading does not support upsert
changelog mode");
}
@Test
@@ -479,7 +479,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
() ->
streamSqlIter(
"SELECT * FROM T1 /*+
OPTIONS('log.changelog-mode'='upsert') */"),
- "File store continuous reading dose not support upsert
changelog mode");
+ "File store continuous reading does not support upsert
changelog mode");
}
@Test
@@ -489,7 +489,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
() ->
streamSqlIter(
"SELECT * FROM T1 /*+
OPTIONS('log.consistency'='eventual') */"),
- "File store continuous reading dose not support eventual
consistency mode");
+ "File store continuous reading does not support eventual
consistency mode");
}
@Test
@@ -499,6 +499,6 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
() ->
streamSqlIter(
"SELECT * FROM T1 /*+
OPTIONS('log.consistency'='eventual') */"),
- "File store continuous reading dose not support eventual
consistency mode");
+ "File store continuous reading does not support eventual
consistency mode");
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
index 046289eb9..73a9c0f7c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink;
import org.apache.paimon.CoreOptions.LogChangelogMode;
import org.apache.paimon.CoreOptions.LogConsistency;
+import org.apache.paimon.CoreOptions.StreamingReadMode;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.flink.log.LogStoreTableFactory;
@@ -53,6 +54,7 @@ import java.util.Set;
import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
import static org.apache.paimon.CoreOptions.LOG_CONSISTENCY;
+import static org.apache.paimon.CoreOptions.STREAMING_READ_MODE;
import static org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM;
import static org.apache.paimon.flink.FlinkConnectorOptions.NONE;
import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
@@ -123,14 +125,19 @@ public abstract class AbstractFlinkTableFactory
private static void validateFileStoreContinuous(Options options) {
LogChangelogMode changelogMode = options.get(LOG_CHANGELOG_MODE);
+ StreamingReadMode streamingReadMode = options.get(STREAMING_READ_MODE);
if (changelogMode == LogChangelogMode.UPSERT) {
throw new ValidationException(
- "File store continuous reading dose not support upsert
changelog mode.");
+ "File store continuous reading does not support upsert
changelog mode.");
}
LogConsistency consistency = options.get(LOG_CONSISTENCY);
if (consistency == LogConsistency.EVENTUAL) {
throw new ValidationException(
- "File store continuous reading dose not support eventual
consistency mode.");
+ "File store continuous reading does not support eventual
consistency mode.");
+ }
+ if (streamingReadMode == StreamingReadMode.LOG) {
+ throw new ValidationException(
+ "File store continuous reading does not support the log
streaming read mode.");
}
}
@@ -142,6 +149,7 @@ public abstract class AbstractFlinkTableFactory
static Table buildPaimonTable(DynamicTableFactory.Context context) {
CatalogTable origin = context.getCatalogTable().getOrigin();
Table table;
+
if (origin instanceof DataCatalogTable) {
table = ((DataCatalogTable)
origin).table().copy(origin.getOptions());
} else {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index f6b0f1110..ddf85c424 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.CoreOptions.StreamingReadMode;
import org.apache.paimon.annotation.Documentation.ExcludeFromDocumentation;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
@@ -32,6 +33,7 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
+import static org.apache.paimon.CoreOptions.STREAMING_READ_MODE;
import static org.apache.paimon.options.ConfigOptions.key;
import static org.apache.paimon.options.description.TextElement.text;
@@ -58,7 +60,11 @@ public class FlinkConnectorOptions {
.list(
TextElement.text(
"\"kafka\": Kafka log
system, the data is double written to file"
- + " store and
kafka, and the streaming read will be read from kafka."))
+ + " store and
kafka, and the streaming read will be read from kafka. If streaming read from
file, configures "
+ +
STREAMING_READ_MODE.key()
+ + " to "
+ +
StreamingReadMode.FILE.getValue()
+ + "."))
.build());
public static final ConfigOption<Integer> SINK_PARALLELISM =
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index e4b00e7cf..b24967965 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -189,7 +189,7 @@ public class DataTableSource extends FlinkTableSource
options.get(SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL));
} catch (NoSuchMethodError error) {
throw new RuntimeException(
- "Flink 1.14 dose not support watermark alignment,
please check your Flink version.",
+ "Flink 1.14 does not support watermark alignment,
please check your Flink version.",
error);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index 17aa346d9..0895c7389 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.source;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.StartupMode;
+import org.apache.paimon.CoreOptions.StreamingReadMode;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.Projection;
import org.apache.paimon.flink.log.LogSourceProvider;
@@ -46,6 +47,7 @@ import javax.annotation.Nullable;
import java.util.List;
import java.util.Optional;
+import static org.apache.paimon.CoreOptions.StreamingReadMode.FILE;
import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
/**
@@ -142,9 +144,9 @@ public class FlinkSourceBuilder {
// TODO visit all options through CoreOptions
StartupMode startupMode = CoreOptions.startupMode(conf);
- if (logSourceProvider == null) {
- return buildContinuousFileSource();
- } else {
+ StreamingReadMode streamingReadMode =
CoreOptions.streamReadType(conf);
+
+ if (logSourceProvider != null && streamingReadMode != FILE) {
if (startupMode != StartupMode.LATEST_FULL) {
return logSourceProvider.createSource(null);
}
@@ -155,6 +157,8 @@ public class FlinkSourceBuilder {
new LogHybridSourceFactory(logSourceProvider),
Boundedness.CONTINUOUS_UNBOUNDED)
.build();
+ } else {
+ return buildContinuousFileSource();
}
} else {
return buildStaticFileSource();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index a9eee15b2..163344217 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -286,7 +286,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
"SELECT * FROM T1 /*+
OPTIONS('log.changelog-mode'='upsert') */"))
.hasCauseInstanceOf(ValidationException.class)
.hasRootCauseMessage(
- "File store continuous reading dose not support upsert
changelog mode.");
+ "File store continuous reading does not support upsert
changelog mode.");
}
@TestTemplate
@@ -297,6 +297,6 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
"SELECT * FROM T1 /*+
OPTIONS('log.consistency'='eventual') */"))
.hasCauseInstanceOf(ValidationException.class)
.hasRootCauseMessage(
- "File store continuous reading dose not support
eventual consistency mode.");
+ "File store continuous reading does not support
eventual consistency mode.");
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LogSystemITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LogSystemITCase.java
index e58c779a9..829d6744d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LogSystemITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LogSystemITCase.java
@@ -22,6 +22,7 @@ import org.apache.paimon.flink.kafka.KafkaTableTestBase;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.ValidationException;
import org.apache.flink.types.Row;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -30,6 +31,7 @@ import java.io.IOException;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** ITCase for table with log system. */
public class LogSystemITCase extends KafkaTableTestBase {
@@ -67,4 +69,121 @@ public class LogSystemITCase extends KafkaTableTestBase {
write.getJobClient().get().cancel();
read.close();
}
+
+ @Test
+ public void testReadFromFile() throws Exception {
+ createTopicIfNotExists("test-double-sink", 1);
+ env.getCheckpointConfig().setCheckpointInterval(3 * 1000);
+ env.setParallelism(1);
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE kafka_file_double_sink (\n"
+ + " word STRING ,\n"
+ + " cnt BIGINT,\n"
+ + " PRIMARY KEY (word) NOT ENFORCED\n"
+ + ")\n"
+ + "WITH (\n"
+ + " 'merge-engine' = 'aggregation',\n"
+ + " 'changelog-producer' =
'full-compaction',\n"
+ + " 'log.system' = 'kafka',\n"
+ + " 'streaming-read-mode'='file',\n"
+ + " 'fields.cnt.aggregate-function' =
'sum',\n"
+ + " 'kafka.bootstrap.servers' = '%s',\n"
+ + " 'kafka.topic' = 'test-double-sink',\n"
+ + "
'kafka.transaction.timeout.ms'='30000'\n"
+ + "\n"
+ + ");",
+ getBootstrapServers()));
+ TableResult write =
+ tEnv.executeSql(
+ "INSERT INTO kafka_file_double_sink
values('a',1),('b',2),('c',3);");
+ BlockingIterator<Row, Row> read =
+ BlockingIterator.of(
+ tEnv.executeSql("SELECT * FROM
kafka_file_double_sink").collect());
+ assertThat(read.collect(3))
+ .containsExactlyInAnyOrder(Row.of("a", 1L), Row.of("b", 2L),
Row.of("c", 3L));
+ write.getJobClient().get().cancel();
+ read.close();
+ }
+
+ @Test
+ public void testReadFromLog() throws Exception {
+ createTopicIfNotExists("test-single-sink", 1);
+ // disable checkpointing to test eventual
+ env.getCheckpointConfig().disableCheckpointing();
+ env.setParallelism(1);
+ // 'fields.cnt.aggregate-function' = 'sum' is miss will throw
+ // java.lang.UnsupportedOperationException: Aggregate function
'last_non_null_value' does
+ // not support retraction
+ // data will only be written to kafka
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE kafka_file_single_sink (\n"
+ + " word STRING ,\n"
+ + " cnt BIGINT,\n"
+ + " PRIMARY KEY (word) NOT ENFORCED\n"
+ + ")\n"
+ + "WITH (\n"
+ + " 'merge-engine' = 'aggregation',\n"
+ + " 'changelog-producer' =
'full-compaction',\n"
+ + " 'log.consistency' = 'eventual',\n"
+ + " 'log.system' = 'kafka',\n"
+ + " 'streaming-read-mode'='log',\n"
+ + " 'kafka.bootstrap.servers' = '%s',\n"
+ + " 'kafka.topic' = 'test-single-sink',\n"
+ + "
'kafka.transaction.timeout.ms'='30000'\n"
+ + "\n"
+ + ");",
+ getBootstrapServers()));
+ tEnv.executeSql(
+ "CREATE TEMPORARY TABLE word_table (\n"
+ + " word STRING\n"
+ + ") WITH (\n"
+ + " 'connector' = 'datagen',\n"
+ + " 'fields.word.length' = '1'\n"
+ + ");");
+ TableResult write =
+ tEnv.executeSql(
+ "INSERT INTO kafka_file_single_sink SELECT word,
COUNT(*) FROM word_table GROUP BY word;");
+ BlockingIterator<Row, Row> read =
+ BlockingIterator.of(
+ tEnv.executeSql("SELECT * FROM
kafka_file_single_sink").collect());
+ List<Row> collect = read.collect(10);
+ assertThat(collect).hasSize(10);
+ write.getJobClient().get().cancel();
+ read.close();
+ }
+
+ @Test
+ public void testReadFromLogWithOutSteamingReadMode() throws Exception {
+ createTopicIfNotExists("test-single-sink", 1);
+ env.setParallelism(1);
+
+ tEnv.executeSql(
+ "CREATE TABLE kafka_file_single_sink (\n"
+ + " word STRING ,\n"
+ + " cnt BIGINT,\n"
+ + " PRIMARY KEY (word) NOT ENFORCED\n"
+ + ")\n"
+ + "WITH (\n"
+ + " 'merge-engine' = 'aggregation',\n"
+ + " 'changelog-producer' = 'full-compaction',\n"
+ + " 'streaming-read-mode'='log'\n"
+ + ");");
+ tEnv.executeSql(
+ "CREATE TEMPORARY TABLE word_table (\n"
+ + " word STRING\n"
+ + ") WITH (\n"
+ + " 'connector' = 'datagen',\n"
+ + " 'fields.word.length' = '1'\n"
+ + ");");
+ assertThatThrownBy(
+ () ->
+ tEnv.executeSql(
+ "INSERT INTO kafka_file_single_sink
SELECT word, COUNT(*) FROM word_table GROUP BY word;"))
+ .getRootCause()
+ .isInstanceOf(ValidationException.class)
+ .hasMessage(
+ "File store continuous reading does not support the
log streaming read mode.");
+ }
}