This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 976c605f4 [core] Partial-update streaming read supports input
changelog-producer (#2026)
976c605f4 is described below
commit 976c605f43e999ed47e2893a02143ca2041b7ea0
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Sep 20 21:12:00 2023 +0800
[core] Partial-update streaming read supports input changelog-producer
(#2026)
---
docs/content/concepts/primary-key-table.md | 6 ++++--
.../org/apache/paimon/flink/utils/TableScanUtils.java | 14 ++++++--------
.../java/org/apache/paimon/flink/LookupJoinITCase.java | 3 ++-
.../org/apache/paimon/flink/PartialUpdateITCase.java | 18 ++++++++++++++++++
4 files changed, 30 insertions(+), 11 deletions(-)
diff --git a/docs/content/concepts/primary-key-table.md
b/docs/content/concepts/primary-key-table.md
index e019ada53..403236b6e 100644
--- a/docs/content/concepts/primary-key-table.md
+++ b/docs/content/concepts/primary-key-table.md
@@ -120,7 +120,8 @@ For example, suppose Paimon receives three records:
Assuming that the first column is the primary key, the final result would be
`<1, 25.2, 10, 'This is a book'>`.
{{< hint info >}}
-For streaming queries, `partial-update` merge engine must be used together
with `lookup` or `full-compaction` [changelog producer]({{< ref
"concepts/primary-key-table#changelog-producers" >}}).
+For streaming queries, `partial-update` merge engine must be used together
with `lookup` or `full-compaction`
+[changelog producer]({{< ref "concepts/primary-key-table#changelog-producers"
>}}). ('input' changelog producer is also supported, but only returns input
records.)
{{< /hint >}}
{{< hint info >}}
@@ -257,7 +258,8 @@ If you allow some functions to ignore retraction messages,
you can configure:
`'fields.${field_name}.ignore-retract'='true'`.
{{< hint info >}}
-For streaming queries, `aggregation` merge engine must be used together with
`lookup` or `full-compaction` [changelog producer]({{< ref
"concepts/primary-key-table#changelog-producers" >}}).
+For streaming queries, `aggregation` merge engine must be used together with
`lookup` or `full-compaction`
+[changelog producer]({{< ref "concepts/primary-key-table#changelog-producers"
>}}). ('input' changelog producer is also supported, but only returns input
records.)
{{< /hint >}}
### First Row
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
index b9c6bb912..05486972f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
@@ -39,14 +39,12 @@ public class TableScanUtils {
}
};
if (table.primaryKeys().size() > 0 &&
mergeEngineDesc.containsKey(mergeEngine)) {
- switch (options.changelogProducer()) {
- case NONE:
- case INPUT:
- throw new RuntimeException(
- mergeEngineDesc.get(mergeEngine)
- + " streaming reading is not supported.
You can use "
- + "'lookup' or 'full-compaction' changelog
producer to support streaming reading.");
- default:
+ if (options.changelogProducer() ==
CoreOptions.ChangelogProducer.NONE) {
+ throw new RuntimeException(
+ mergeEngineDesc.get(mergeEngine)
+ + " streaming reading is not supported. You
can use "
+ + "'lookup' or 'full-compaction' changelog
producer to support streaming reading. "
+ + "('input' changelog producer is also
supported, but only returns input records.)");
}
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
index 713d4ef2a..70db991e9 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
@@ -469,7 +469,8 @@ public class LookupJoinITCase extends CatalogITCaseBase {
.hasRootCauseMessage(
"Partial update streaming"
+ " reading is not supported. "
- + "You can use 'lookup' or 'full-compaction'
changelog producer to support streaming reading.");
+ + "You can use 'lookup' or 'full-compaction'
changelog producer to support streaming reading. "
+ + "('input' changelog producer is also
supported, but only returns input records.)");
}
@Test
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
index 87103c000..887183518 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
@@ -18,6 +18,8 @@
package org.apache.paimon.flink;
+import org.apache.paimon.utils.BlockingIterator;
+
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
@@ -29,6 +31,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
@@ -149,6 +152,21 @@ public class PartialUpdateITCase extends CatalogITCaseBase
{
"Partial update continuous reading is not supported");
}
+ @Test
+ public void testStreamingReadChangelogInput() throws TimeoutException {
+ sql(
+ "CREATE TABLE INPUT_T ("
+ + "a INT, b INT, c INT, PRIMARY KEY (a) NOT ENFORCED)"
+ + " WITH ('merge-engine'='partial-update',
'changelog-producer'='input');");
+ BlockingIterator<Row, Row> iterator =
+ BlockingIterator.of(streamSqlIter("SELECT * FROM INPUT_T"));
+ sql("INSERT INTO INPUT_T VALUES (1, CAST(NULL AS INT), 1)");
+ assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of(1,
null, 1));
+ sql("INSERT INTO INPUT_T VALUES (1, 1, CAST(NULL AS INT)), (2, 2, 2)");
+ assertThat(iterator.collect(2))
+ .containsExactlyInAnyOrder(Row.of(1, 1, null), Row.of(2, 2,
2));
+ }
+
@Test
public void testSequenceGroup() {
sql(