This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8902b163a [bug] Ignore the consumer-id in batch job (#2856)
8902b163a is described below

commit 8902b163af7652b72c0a9ab9c0d4f212b9c15938
Author: Aitozi <[email protected]>
AuthorDate: Fri Feb 23 11:24:48 2024 +0800

    [bug] Ignore the consumer-id in batch job (#2856)
---
 .../table/source/AbstractInnerTableScan.java       |  2 +-
 .../paimon/flink/ContinuousFileStoreITCase.java    | 23 ++++++++++++++++++++++
 2 files changed, 24 insertions(+), 1 deletion(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
index 16246c113..b597ec70b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
@@ -114,7 +114,7 @@ public abstract class AbstractInnerTableScan implements 
InnerTableScan {
 
         // read from consumer id
         String consumerId = options.consumerId();
-        if (consumerId != null && !options.consumerIgnoreProgress()) {
+        if (isStreaming && consumerId != null && 
!options.consumerIgnoreProgress()) {
             ConsumerManager consumerManager = snapshotReader.consumerManager();
             Optional<Consumer> consumer = consumerManager.consumer(consumerId);
             if (consumer.isPresent()) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index 8ba3221d3..d5db6a588 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -153,6 +153,29 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
         iterator.close();
     }
 
+    @Test
+    public void testConsumerIdInBatch() throws Exception {
+        String table = "T2";
+
+        batchSql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", 
table);
+        BlockingIterator<Row, Row> iterator =
+                BlockingIterator.of(
+                        streamSqlIter(
+                                "SELECT * FROM %s /*+ 
OPTIONS('consumer-id'='me') */", table));
+
+        assertThat(iterator.collect(2))
+                .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", 
"5", "6"));
+
+        Thread.sleep(1000);
+        iterator.close();
+
+        batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
+        // ignore the consumer id in batch mode
+        assertThat(sql("SELECT * FROM %s /*+ OPTIONS('consumer-id'='me') */", 
table))
+                .containsExactlyInAnyOrder(
+                        Row.of("1", "2", "3"), Row.of("4", "5", "6"), 
Row.of("7", "8", "9"));
+    }
+
     @Test
     @Timeout(120)
     public void testSnapshotWatermark() throws Exception {

Reply via email to