This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ebb05fc28 [flink] Consumer expiration time must be specified when
using consumer-id (#3736)
ebb05fc28 is described below
commit ebb05fc281d5b6395361f2f3e4db3bc7cdf37b5c
Author: HeavenZH <[email protected]>
AuthorDate: Mon Jul 15 13:52:24 2024 +0800
[flink] Consumer expiration time must be specified when using consumer-id
(#3736)
This closes #3736.
---
.../paimon/flink/source/FlinkSourceBuilder.java | 6 ++++-
.../apache/paimon/flink/CatalogTableITCase.java | 31 +++++++++++++++++++++-
.../paimon/flink/ContinuousFileStoreITCase.java | 17 ++++++++----
.../paimon/flink/action/ConsumerActionITCase.java | 4 ++-
.../paimon/flink/source/SourceMetricsITCase.java | 2 +-
5 files changed, 51 insertions(+), 9 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index 32e61903c..4fe6b3ec1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -250,11 +250,15 @@ public class FlinkSourceBuilder {
if (env == null) {
throw new IllegalArgumentException("StreamExecutionEnvironment
should not be null.");
}
+ if (conf.contains(CoreOptions.CONSUMER_ID)
+ && !conf.contains(CoreOptions.CONSUMER_EXPIRATION_TIME)) {
+ throw new IllegalArgumentException(
+ "consumer.expiration-time should be specified when using
consumer-id.");
+ }
if (sourceBounded) {
return buildStaticFileSource();
}
-
TableScanUtils.streamingReadingValidate(table);
// TODO visit all options through CoreOptions
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index 83a89a6bb..a9ce8135b 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -742,7 +742,8 @@ public class CatalogTableITCase extends CatalogITCaseBase {
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(
- streamSqlIter("SELECT * FROM T /*+
OPTIONS('consumer-id'='my1') */"));
+ streamSqlIter(
+ "SELECT * FROM T /*+
OPTIONS('consumer-id'='my1','consumer.expiration-time'='3h') */"));
batchSql("INSERT INTO T VALUES (5, 6), (7, 8)");
assertThat(iterator.collect(2)).containsExactlyInAnyOrder(Row.of(1,
2), Row.of(3, 4));
@@ -754,6 +755,34 @@ public class CatalogTableITCase extends CatalogITCaseBase {
assertThat((Long) result.get(0).getField(1)).isGreaterThanOrEqualTo(3);
}
+ @Test
+ public void testConsumerIdExpInBatchMode() {
+ batchSql("CREATE TABLE T (a INT, b INT)");
+ batchSql("INSERT INTO T VALUES (1, 2)");
+ batchSql("INSERT INTO T VALUES (3, 4)");
+ batchSql("INSERT INTO T VALUES (5, 6), (7, 8)");
+ assertThatThrownBy(
+ () ->
+ sql(
+ "SELECT * FROM T /*+
OPTIONS('consumer-id' = 'test-id') */ WHERE a = 1"))
+ .rootCause()
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("consumer.expiration-time should be specified.");
+ }
+
+ @Test
+ public void testConsumerIdExpInStreamingMode() {
+ batchSql("CREATE TABLE T (a INT, b INT)");
+ batchSql("INSERT INTO T VALUES (1, 2)");
+ batchSql("INSERT INTO T VALUES (3, 4)");
+ assertThatThrownBy(
+ () ->
+ streamSqlIter(
+ "SELECT * FROM T /*+
OPTIONS('consumer-id'='test-id') */"))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("consumer.expiration-time should be specified.");
+ }
+
@Test
public void testPartitionsTable() {
String table = "PARTITIONS_TABLE";
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index 54c193801..a0ce8be3a 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -138,7 +138,8 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(
streamSqlIter(
- "SELECT * FROM %s /*+
OPTIONS('consumer-id'='me') */", table));
+ "SELECT * FROM %s /*+
OPTIONS('consumer-id'='me','consumer.expiration-time'='3h') */",
+ table));
batchSql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')",
table);
assertThat(iterator.collect(2))
@@ -150,7 +151,8 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
iterator =
BlockingIterator.of(
streamSqlIter(
- "SELECT * FROM %s /*+
OPTIONS('consumer-id'='me') */", table));
+ "SELECT * FROM %s /*+
OPTIONS('consumer-id'='me','consumer.expiration-time'='3h') */",
+ table));
batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("7",
"8", "9"));
iterator.close();
@@ -164,7 +166,8 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(
streamSqlIter(
- "SELECT * FROM %s /*+
OPTIONS('consumer-id'='me') */", table));
+ "SELECT * FROM %s /*+
OPTIONS('consumer-id'='me','consumer.expiration-time'='3h') */",
+ table));
assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4",
"5", "6"));
@@ -174,7 +177,10 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
// ignore the consumer id in batch mode
- assertThat(sql("SELECT * FROM %s /*+ OPTIONS('consumer-id'='me') */",
table))
+ assertThat(
+ sql(
+ "SELECT * FROM %s /*+
OPTIONS('consumer-id'='me','consumer.expiration-time'='3h') */",
+ table))
.containsExactlyInAnyOrder(
Row.of("1", "2", "3"), Row.of("4", "5", "6"),
Row.of("7", "8", "9"));
}
@@ -188,7 +194,8 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
CloseableIterator<Row> insert1 = streamSqlIter("INSERT INTO T2 SELECT
a, b, c FROM gen");
sql("CREATE TABLE WT (a STRING, b STRING, c STRING, PRIMARY KEY (a)
NOT ENFORCED)");
CloseableIterator<Row> insert2 =
- streamSqlIter("INSERT INTO WT SELECT * FROM T2 /*+
OPTIONS('consumer-id'='me') */");
+ streamSqlIter(
+ "INSERT INTO WT SELECT * FROM T2 /*+
OPTIONS('consumer-id'='me','consumer.expiration-time'='3h') */");
while (true) {
Set<Long> watermarks =
sql("SELECT `watermark` FROM WT$snapshots").stream()
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
index 82503e5c3..4818c97e6 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
@@ -70,7 +70,9 @@ public class ConsumerActionITCase extends ActionITCaseBase {
// use consumer streaming read table
testStreamingRead(
- "SELECT * FROM `" + tableName + "` /*+
OPTIONS('consumer-id'='myid') */",
+ "SELECT * FROM `"
+ + tableName
+ + "` /*+
OPTIONS('consumer-id'='myid','consumer.expiration-time'='3h') */",
Arrays.asList(
changelogRow("+I", 1L, "Hi"),
changelogRow("+I", 2L, "Hello"),
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/SourceMetricsITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/SourceMetricsITCase.java
index e7d05404a..5bdc27dd9 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/SourceMetricsITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/SourceMetricsITCase.java
@@ -111,7 +111,7 @@ public class SourceMetricsITCase {
"CREATE TEMPORARY TABLE B ( k INT, v INT ) WITH ( 'connector'
= 'blackhole' )");
TableResult tableResult =
tEnv.executeSql(
- "INSERT INTO B SELECT * FROM T /*+
OPTIONS('consumer-id' = 'test') */");
+ "INSERT INTO B SELECT * FROM T /*+
OPTIONS('consumer-id' = 'test','consumer.expiration-time'='3h') */");
JobClient client = tableResult.getJobClient().get();
JobID jobId = client.getJobID();