This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 976c605f4 [core] Partial-update streaming read supports input 
changelog-producer (#2026)
976c605f4 is described below

commit 976c605f43e999ed47e2893a02143ca2041b7ea0
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Sep 20 21:12:00 2023 +0800

    [core] Partial-update streaming read supports input changelog-producer 
(#2026)
---
 docs/content/concepts/primary-key-table.md             |  6 ++++--
 .../org/apache/paimon/flink/utils/TableScanUtils.java  | 14 ++++++--------
 .../java/org/apache/paimon/flink/LookupJoinITCase.java |  3 ++-
 .../org/apache/paimon/flink/PartialUpdateITCase.java   | 18 ++++++++++++++++++
 4 files changed, 30 insertions(+), 11 deletions(-)

diff --git a/docs/content/concepts/primary-key-table.md 
b/docs/content/concepts/primary-key-table.md
index e019ada53..403236b6e 100644
--- a/docs/content/concepts/primary-key-table.md
+++ b/docs/content/concepts/primary-key-table.md
@@ -120,7 +120,8 @@ For example, suppose Paimon receives three records:
 Assuming that the first column is the primary key, the final result would be 
`<1, 25.2, 10, 'This is a book'>`.
 
 {{< hint info >}}
-For streaming queries, `partial-update` merge engine must be used together 
with `lookup` or `full-compaction` [changelog producer]({{< ref 
"concepts/primary-key-table#changelog-producers" >}}).
+For streaming queries, `partial-update` merge engine must be used together 
with `lookup` or `full-compaction`
+[changelog producer]({{< ref "concepts/primary-key-table#changelog-producers" 
>}}). ('input' changelog producer is also supported, but only returns input 
records.)
 {{< /hint >}}
 
 {{< hint info >}}
@@ -257,7 +258,8 @@ If you allow some functions to ignore retraction messages, 
you can configure:
 `'fields.${field_name}.ignore-retract'='true'`.
 
 {{< hint info >}}
-For streaming queries, `aggregation` merge engine must be used together with 
`lookup` or `full-compaction` [changelog producer]({{< ref 
"concepts/primary-key-table#changelog-producers" >}}).
+For streaming queries, `aggregation` merge engine must be used together with 
`lookup` or `full-compaction`
+[changelog producer]({{< ref "concepts/primary-key-table#changelog-producers" 
>}}). ('input' changelog producer is also supported, but only returns input 
records.)
 {{< /hint >}}
 
 ### First Row
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
index b9c6bb912..05486972f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
@@ -39,14 +39,12 @@ public class TableScanUtils {
                     }
                 };
         if (table.primaryKeys().size() > 0 && 
mergeEngineDesc.containsKey(mergeEngine)) {
-            switch (options.changelogProducer()) {
-                case NONE:
-                case INPUT:
-                    throw new RuntimeException(
-                            mergeEngineDesc.get(mergeEngine)
-                                    + " streaming reading is not supported. 
You can use "
-                                    + "'lookup' or 'full-compaction' changelog 
producer to support streaming reading.");
-                default:
+            if (options.changelogProducer() == 
CoreOptions.ChangelogProducer.NONE) {
+                throw new RuntimeException(
+                        mergeEngineDesc.get(mergeEngine)
+                                + " streaming reading is not supported. You 
can use "
+                                + "'lookup' or 'full-compaction' changelog 
producer to support streaming reading. "
+                                + "('input' changelog producer is also 
supported, but only returns input records.)");
             }
         }
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
index 713d4ef2a..70db991e9 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
@@ -469,7 +469,8 @@ public class LookupJoinITCase extends CatalogITCaseBase {
                 .hasRootCauseMessage(
                         "Partial update streaming"
                                 + " reading is not supported. "
-                                + "You can use 'lookup' or 'full-compaction' 
changelog producer to support streaming reading.");
+                                + "You can use 'lookup' or 'full-compaction' 
changelog producer to support streaming reading. "
+                                + "('input' changelog producer is also 
supported, but only returns input records.)");
     }
 
     @Test
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
index 87103c000..887183518 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.flink;
 
+import org.apache.paimon.utils.BlockingIterator;
+
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
@@ -29,6 +31,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -149,6 +152,21 @@ public class PartialUpdateITCase extends CatalogITCaseBase 
{
                 "Partial update continuous reading is not supported");
     }
 
+    @Test
+    public void testStreamingReadChangelogInput() throws TimeoutException {
+        sql(
+                "CREATE TABLE INPUT_T ("
+                        + "a INT, b INT, c INT, PRIMARY KEY (a) NOT ENFORCED)"
+                        + " WITH ('merge-engine'='partial-update', 
'changelog-producer'='input');");
+        BlockingIterator<Row, Row> iterator =
+                BlockingIterator.of(streamSqlIter("SELECT * FROM INPUT_T"));
+        sql("INSERT INTO INPUT_T VALUES (1, CAST(NULL AS INT), 1)");
+        assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of(1, 
null, 1));
+        sql("INSERT INTO INPUT_T VALUES (1, 1, CAST(NULL AS INT)), (2, 2, 2)");
+        assertThat(iterator.collect(2))
+                .containsExactlyInAnyOrder(Row.of(1, 1, null), Row.of(2, 2, 
2));
+    }
+
     @Test
     public void testSequenceGroup() {
         sql(

Reply via email to