This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 63551bbda1 [flink][hotfix] Wait for consumer reset before job close
(#4578)
63551bbda1 is described below
commit 63551bbda17518e28655452034cbd1676729c1b4
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Mon Nov 25 10:58:48 2024 +0800
[flink][hotfix] Wait for consumer reset before job close (#4578)
---
.../org/apache/paimon/flink/BranchSqlITCase.java | 13 ++++++++--
.../apache/paimon/flink/CatalogTableITCase.java | 12 +++++++++-
.../paimon/flink/ContinuousFileStoreITCase.java | 9 ++++++-
.../paimon/flink/action/ConsumerActionITCase.java | 28 ++++++++++++++++------
4 files changed, 51 insertions(+), 11 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
index c25d99cb44..2566fbe92e 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
@@ -27,6 +27,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.util.ArrayList;
@@ -440,6 +441,7 @@ public class BranchSqlITCase extends CatalogITCaseBase {
}
@Test
+ @Timeout(60)
public void testBranchConsumersTable() throws Exception {
sql("CREATE TABLE t (a INT, b INT)");
sql("INSERT INTO t VALUES (1, 2), (3,4)");
@@ -451,11 +453,18 @@ public class BranchSqlITCase extends CatalogITCaseBase {
"SELECT * FROM t$branch_b1 /*+
OPTIONS('consumer-id'='id1','consumer.expiration-time'='3h') */"));
sql("INSERT INTO t$branch_b1 VALUES (5, 6), (7, 8)");
assertThat(iterator.collect(2)).containsExactlyInAnyOrder(Row.of(5,
6), Row.of(7, 8));
+ List<String> branchResult;
+ do {
+ branchResult = collectResult("SELECT * FROM
t$branch_b1$consumers");
+ if (!branchResult.isEmpty()) {
+ break;
+ }
+ Thread.sleep(1000);
+ } while (true);
iterator.close();
assertThat(collectResult("SELECT * FROM t$consumers")).isEmpty();
- assertThat(collectResult("SELECT * FROM t$branch_b1$consumers"))
- .containsExactlyInAnyOrder("+I[id1, 2]");
+ assertThat(branchResult).containsExactlyInAnyOrder("+I[id1, 2]");
assertThat(collectResult("SELECT * FROM t$consumers /*+
OPTIONS('branch'='b1') */"))
.containsExactlyInAnyOrder("+I[id1, 2]");
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index 8a3e068a72..2a855796d8 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -33,6 +33,7 @@ import
org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.types.Row;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import javax.annotation.Nonnull;
@@ -940,6 +941,7 @@ public class CatalogTableITCase extends CatalogITCaseBase {
}
@Test
+ @Timeout(60)
public void testConsumersTable() throws Exception {
batchSql("CREATE TABLE T (a INT, b INT)");
batchSql("INSERT INTO T VALUES (1, 2)");
@@ -952,9 +954,17 @@ public class CatalogTableITCase extends CatalogITCaseBase {
batchSql("INSERT INTO T VALUES (5, 6), (7, 8)");
assertThat(iterator.collect(2)).containsExactlyInAnyOrder(Row.of(1,
2), Row.of(3, 4));
+
+ List<Row> result;
+ do {
+ result = sql("SELECT * FROM T$consumers");
+ if (!result.isEmpty()) {
+ break;
+ }
+ Thread.sleep(1000);
+ } while (true);
iterator.close();
- List<Row> result = sql("SELECT * FROM T$consumers");
assertThat(result).hasSize(1);
assertThat(result.get(0).getField(0)).isEqualTo("my1");
assertThat((Long) result.get(0).getField(1)).isGreaterThanOrEqualTo(3);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index 2e15697511..b448858328 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -120,7 +120,14 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4",
"5", "6"));
- Thread.sleep(1000);
+ List<Row> result;
+ do {
+ result = sql("SELECT * FROM %s$consumers", table);
+ if (!result.isEmpty()) {
+ break;
+ }
+ Thread.sleep(1000);
+ } while (true);
iterator.close();
iterator =
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
index e2243ddf26..6fb8c81eb7 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
@@ -26,8 +26,11 @@ import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.BlockingIterator;
import org.apache.flink.table.api.TableException;
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -46,6 +49,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
public class ConsumerActionITCase extends ActionITCaseBase {
@ParameterizedTest
+ @Timeout(60)
@ValueSource(strings = {"action", "procedure_indexed", "procedure_named"})
public void testResetConsumer(String invoker) throws Exception {
init(warehouse);
@@ -72,18 +76,22 @@ public class ConsumerActionITCase extends ActionITCaseBase {
writeData(rowData(3L, BinaryString.fromString("Paimon")));
// use consumer streaming read table
- testStreamingRead(
+ BlockingIterator<Row, Row> iterator =
+ testStreamingRead(
"SELECT * FROM `"
+ tableName
+ "` /*+
OPTIONS('consumer-id'='myid','consumer.expiration-time'='3h') */",
Arrays.asList(
changelogRow("+I", 1L, "Hi"),
changelogRow("+I", 2L, "Hello"),
- changelogRow("+I", 3L, "Paimon")))
- .close();
+ changelogRow("+I", 3L, "Paimon")));
- Thread.sleep(1000);
ConsumerManager consumerManager = new ConsumerManager(table.fileIO(),
table.location());
+ while (!consumerManager.consumer("myid").isPresent()) {
+ Thread.sleep(1000);
+ }
+ iterator.close();
+
Optional<Consumer> consumer1 = consumerManager.consumer("myid");
assertThat(consumer1).isPresent();
assertThat(consumer1.get().nextSnapshot()).isEqualTo(4);
@@ -191,6 +199,7 @@ public class ConsumerActionITCase extends ActionITCaseBase {
}
@ParameterizedTest
+ @Timeout(60)
@ValueSource(strings = {"action", "procedure_indexed", "procedure_named"})
public void testResetBranchConsumer(String invoker) throws Exception {
init(warehouse);
@@ -222,18 +231,23 @@ public class ConsumerActionITCase extends
ActionITCaseBase {
String branchTableName = tableName + "$branch_b1";
// use consumer streaming read table
- testStreamingRead(
+ BlockingIterator<Row, Row> iterator =
+ testStreamingRead(
"SELECT * FROM `"
+ branchTableName
+ "` /*+
OPTIONS('consumer-id'='myid','consumer.expiration-time'='3h') */",
Arrays.asList(
changelogRow("+I", 1L, "Hi"),
changelogRow("+I", 2L, "Hello"),
- changelogRow("+I", 3L, "Paimon")))
- .close();
+ changelogRow("+I", 3L, "Paimon")));
ConsumerManager consumerManager =
new ConsumerManager(table.fileIO(), table.location(),
branchName);
+ while (!consumerManager.consumer("myid").isPresent()) {
+ Thread.sleep(1000);
+ }
+ iterator.close();
+
Optional<Consumer> consumer1 = consumerManager.consumer("myid");
assertThat(consumer1).isPresent();
assertThat(consumer1.get().nextSnapshot()).isEqualTo(4);