This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 00a36e35ba [core] fix the issue where streaming reading of overwrite 
data would fail when retract type data appeared. (#4697)
00a36e35ba is described below

commit 00a36e35bafbe10893725418443ea8fa1cd85c30
Author: liming.1018 <[email protected]>
AuthorDate: Fri Dec 13 09:50:29 2024 +0800

    [core] fix the issue where streaming reading of overwrite data would fail 
when retract type data appeared. (#4697)
---
 .../IncrementalChangelogReadProvider.java          |  8 ++---
 .../apache/paimon/flink/ReadWriteTableITCase.java  | 37 ++++++++++++++++++++++
 2 files changed, 41 insertions(+), 4 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
index 308c09d142..eb41d02669 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
@@ -60,20 +60,20 @@ public class IncrementalChangelogReadProvider implements 
SplitReadProvider {
                             ConcatRecordReader.create(
                                     () ->
                                             new ReverseReader(
-                                                    read.createNoMergeReader(
+                                                    read.createMergeReader(
                                                             split.partition(),
                                                             split.bucket(),
                                                             
split.beforeFiles(),
                                                             
split.beforeDeletionFiles()
                                                                     
.orElse(null),
-                                                            true)),
+                                                            false)),
                                     () ->
-                                            read.createNoMergeReader(
+                                            read.createMergeReader(
                                                     split.partition(),
                                                     split.bucket(),
                                                     split.dataFiles(),
                                                     
split.deletionFiles().orElse(null),
-                                                    true));
+                                                    false));
                     return unwrap(reader);
                 };
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index 10de1ae483..732e964542 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -801,6 +801,43 @@ public class ReadWriteTableITCase extends AbstractTestBase 
{
         streamingItr.close();
     }
 
+    @Test
+    public void testStreamingReadOverwriteWithDeleteRecords() throws Exception 
{
+        String table =
+                createTable(
+                        Arrays.asList("currency STRING", "rate BIGINT", "dt 
STRING"),
+                        Collections.singletonList("currency"),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        streamingReadOverwrite);
+
+        insertInto(
+                table,
+                "('US Dollar', 102, '2022-01-01')",
+                "('Yen', 1, '2022-01-02')",
+                "('Euro', 119, '2022-01-02')");
+
+        bEnv.executeSql(String.format("DELETE FROM %s WHERE currency = 
'Euro'", table)).await();
+
+        checkFileStorePath(table, Collections.emptyList());
+
+        // test projection and filter
+        BlockingIterator<Row, Row> streamingItr =
+                testStreamingRead(
+                        buildQuery(table, "currency, rate", "WHERE dt = 
'2022-01-02'"),
+                        Collections.singletonList(changelogRow("+I", "Yen", 
1L)));
+
+        insertOverwrite(table, "('US Dollar', 100, '2022-01-02')", "('Yen', 
10, '2022-01-01')");
+
+        validateStreamingReadResult(
+                streamingItr,
+                Arrays.asList(
+                        changelogRow("-D", "Yen", 1L), changelogRow("+I", "US 
Dollar", 100L)));
+        assertNoMoreRecords(streamingItr);
+
+        streamingItr.close();
+    }
+
     @Test
     public void testUnsupportStreamingReadOverwriteWithoutPk() {
         assertThatThrownBy(

Reply via email to