This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ebb05fc28 [flink] Consumer expiration time must be specified when 
using consumer-id (#3736)
ebb05fc28 is described below

commit ebb05fc281d5b6395361f2f3e4db3bc7cdf37b5c
Author: HeavenZH <[email protected]>
AuthorDate: Mon Jul 15 13:52:24 2024 +0800

    [flink] Consumer expiration time must be specified when using consumer-id 
(#3736)
    
    This closes #3736.
---
 .../paimon/flink/source/FlinkSourceBuilder.java    |  6 ++++-
 .../apache/paimon/flink/CatalogTableITCase.java    | 31 +++++++++++++++++++++-
 .../paimon/flink/ContinuousFileStoreITCase.java    | 17 ++++++++----
 .../paimon/flink/action/ConsumerActionITCase.java  |  4 ++-
 .../paimon/flink/source/SourceMetricsITCase.java   |  2 +-
 5 files changed, 51 insertions(+), 9 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index 32e61903c..4fe6b3ec1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -250,11 +250,15 @@ public class FlinkSourceBuilder {
         if (env == null) {
             throw new IllegalArgumentException("StreamExecutionEnvironment 
should not be null.");
         }
+        if (conf.contains(CoreOptions.CONSUMER_ID)
+                && !conf.contains(CoreOptions.CONSUMER_EXPIRATION_TIME)) {
+            throw new IllegalArgumentException(
+                    "consumer.expiration-time should be specified when using 
consumer-id.");
+        }
 
         if (sourceBounded) {
             return buildStaticFileSource();
         }
-
         TableScanUtils.streamingReadingValidate(table);
 
         // TODO visit all options through CoreOptions
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index 83a89a6bb..a9ce8135b 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -742,7 +742,8 @@ public class CatalogTableITCase extends CatalogITCaseBase {
 
         BlockingIterator<Row, Row> iterator =
                 BlockingIterator.of(
-                        streamSqlIter("SELECT * FROM T /*+ 
OPTIONS('consumer-id'='my1') */"));
+                        streamSqlIter(
+                                "SELECT * FROM T /*+ 
OPTIONS('consumer-id'='my1','consumer.expiration-time'='3h') */"));
 
         batchSql("INSERT INTO T VALUES (5, 6), (7, 8)");
         assertThat(iterator.collect(2)).containsExactlyInAnyOrder(Row.of(1, 
2), Row.of(3, 4));
@@ -754,6 +755,34 @@ public class CatalogTableITCase extends CatalogITCaseBase {
         assertThat((Long) result.get(0).getField(1)).isGreaterThanOrEqualTo(3);
     }
 
+    @Test
+    public void testConsumerIdExpInBatchMode() {
+        batchSql("CREATE TABLE T (a INT, b INT)");
+        batchSql("INSERT INTO T VALUES (1, 2)");
+        batchSql("INSERT INTO T VALUES (3, 4)");
+        batchSql("INSERT INTO T VALUES (5, 6), (7, 8)");
+        assertThatThrownBy(
+                        () ->
+                                sql(
+                                        "SELECT * FROM T /*+ 
OPTIONS('consumer-id' = 'test-id') */ WHERE a = 1"))
+                .rootCause()
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage("consumer.expiration-time should be specified.");
+    }
+
+    @Test
+    public void testConsumerIdExpInStreamingMode() {
+        batchSql("CREATE TABLE T (a INT, b INT)");
+        batchSql("INSERT INTO T VALUES (1, 2)");
+        batchSql("INSERT INTO T VALUES (3, 4)");
+        assertThatThrownBy(
+                        () ->
+                                streamSqlIter(
+                                        "SELECT * FROM T /*+ 
OPTIONS('consumer-id'='test-id') */"))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage("consumer.expiration-time should be specified.");
+    }
+
     @Test
     public void testPartitionsTable() {
         String table = "PARTITIONS_TABLE";
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index 54c193801..a0ce8be3a 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -138,7 +138,8 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
         BlockingIterator<Row, Row> iterator =
                 BlockingIterator.of(
                         streamSqlIter(
-                                "SELECT * FROM %s /*+ 
OPTIONS('consumer-id'='me') */", table));
+                                "SELECT * FROM %s /*+ 
OPTIONS('consumer-id'='me','consumer.expiration-time'='3h') */",
+                                table));
 
         batchSql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", 
table);
         assertThat(iterator.collect(2))
@@ -150,7 +151,8 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
         iterator =
                 BlockingIterator.of(
                         streamSqlIter(
-                                "SELECT * FROM %s /*+ 
OPTIONS('consumer-id'='me') */", table));
+                                "SELECT * FROM %s /*+ 
OPTIONS('consumer-id'='me','consumer.expiration-time'='3h') */",
+                                table));
         batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
         assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("7", 
"8", "9"));
         iterator.close();
@@ -164,7 +166,8 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
         BlockingIterator<Row, Row> iterator =
                 BlockingIterator.of(
                         streamSqlIter(
-                                "SELECT * FROM %s /*+ 
OPTIONS('consumer-id'='me') */", table));
+                                "SELECT * FROM %s /*+ 
OPTIONS('consumer-id'='me','consumer.expiration-time'='3h') */",
+                                table));
 
         assertThat(iterator.collect(2))
                 .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", 
"5", "6"));
@@ -174,7 +177,10 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
 
         batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
         // ignore the consumer id in batch mode
-        assertThat(sql("SELECT * FROM %s /*+ OPTIONS('consumer-id'='me') */", 
table))
+        assertThat(
+                        sql(
+                                "SELECT * FROM %s /*+ 
OPTIONS('consumer-id'='me','consumer.expiration-time'='3h') */",
+                                table))
                 .containsExactlyInAnyOrder(
                         Row.of("1", "2", "3"), Row.of("4", "5", "6"), 
Row.of("7", "8", "9"));
     }
@@ -188,7 +194,8 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
         CloseableIterator<Row> insert1 = streamSqlIter("INSERT INTO T2 SELECT 
a, b, c FROM gen");
         sql("CREATE TABLE WT (a STRING, b STRING, c STRING, PRIMARY KEY (a) 
NOT ENFORCED)");
         CloseableIterator<Row> insert2 =
-                streamSqlIter("INSERT INTO WT SELECT * FROM T2 /*+ 
OPTIONS('consumer-id'='me') */");
+                streamSqlIter(
+                        "INSERT INTO WT SELECT * FROM T2 /*+ 
OPTIONS('consumer-id'='me','consumer.expiration-time'='3h') */");
         while (true) {
             Set<Long> watermarks =
                     sql("SELECT `watermark` FROM WT$snapshots").stream()
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
index 82503e5c3..4818c97e6 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
@@ -70,7 +70,9 @@ public class ConsumerActionITCase extends ActionITCaseBase {
 
         // use consumer streaming read table
         testStreamingRead(
-                        "SELECT * FROM `" + tableName + "` /*+ 
OPTIONS('consumer-id'='myid') */",
+                        "SELECT * FROM `"
+                                + tableName
+                                + "` /*+ 
OPTIONS('consumer-id'='myid','consumer.expiration-time'='3h') */",
                         Arrays.asList(
                                 changelogRow("+I", 1L, "Hi"),
                                 changelogRow("+I", 2L, "Hello"),
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/SourceMetricsITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/SourceMetricsITCase.java
index e7d05404a..5bdc27dd9 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/SourceMetricsITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/SourceMetricsITCase.java
@@ -111,7 +111,7 @@ public class SourceMetricsITCase {
                 "CREATE TEMPORARY TABLE B ( k INT, v INT ) WITH ( 'connector' 
= 'blackhole' )");
         TableResult tableResult =
                 tEnv.executeSql(
-                        "INSERT INTO B SELECT * FROM T /*+ 
OPTIONS('consumer-id' = 'test') */");
+                        "INSERT INTO B SELECT * FROM T /*+ 
OPTIONS('consumer-id' = 'test','consumer.expiration-time'='3h') */");
         JobClient client = tableResult.getJobClient().get();
         JobID jobId = client.getJobID();
 

Reply via email to