This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8902b163a [bug] Ignore the consumer-id in batch job (#2856)
8902b163a is described below
commit 8902b163af7652b72c0a9ab9c0d4f212b9c15938
Author: Aitozi <[email protected]>
AuthorDate: Fri Feb 23 11:24:48 2024 +0800
[bug] Ignore the consumer-id in batch job (#2856)
---
.../table/source/AbstractInnerTableScan.java | 2 +-
.../paimon/flink/ContinuousFileStoreITCase.java | 23 ++++++++++++++++++++++
2 files changed, 24 insertions(+), 1 deletion(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
index 16246c113..b597ec70b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
@@ -114,7 +114,7 @@ public abstract class AbstractInnerTableScan implements
InnerTableScan {
// read from consumer id
String consumerId = options.consumerId();
- if (consumerId != null && !options.consumerIgnoreProgress()) {
+ if (isStreaming && consumerId != null &&
!options.consumerIgnoreProgress()) {
ConsumerManager consumerManager = snapshotReader.consumerManager();
Optional<Consumer> consumer = consumerManager.consumer(consumerId);
if (consumer.isPresent()) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index 8ba3221d3..d5db6a588 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -153,6 +153,29 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
iterator.close();
}
+ @Test
+ public void testConsumerIdInBatch() throws Exception {
+ String table = "T2";
+
+ batchSql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')",
table);
+ BlockingIterator<Row, Row> iterator =
+ BlockingIterator.of(
+ streamSqlIter(
+ "SELECT * FROM %s /*+
OPTIONS('consumer-id'='me') */", table));
+
+ assertThat(iterator.collect(2))
+ .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4",
"5", "6"));
+
+ Thread.sleep(1000);
+ iterator.close();
+
+ batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
+ // ignore the consumer id in batch mode
+ assertThat(sql("SELECT * FROM %s /*+ OPTIONS('consumer-id'='me') */",
table))
+ .containsExactlyInAnyOrder(
+ Row.of("1", "2", "3"), Row.of("4", "5", "6"),
Row.of("7", "8", "9"));
+ }
+
@Test
@Timeout(120)
public void testSnapshotWatermark() throws Exception {