This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 63551bbda1 [flink][hotfix] Wait for consumer reset before job close 
(#4578)
63551bbda1 is described below

commit 63551bbda17518e28655452034cbd1676729c1b4
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Mon Nov 25 10:58:48 2024 +0800

    [flink][hotfix] Wait for consumer reset before job close (#4578)
---
 .../org/apache/paimon/flink/BranchSqlITCase.java   | 13 ++++++++--
 .../apache/paimon/flink/CatalogTableITCase.java    | 12 +++++++++-
 .../paimon/flink/ContinuousFileStoreITCase.java    |  9 ++++++-
 .../paimon/flink/action/ConsumerActionITCase.java  | 28 ++++++++++++++++------
 4 files changed, 51 insertions(+), 11 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
index c25d99cb44..2566fbe92e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
@@ -27,6 +27,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -440,6 +441,7 @@ public class BranchSqlITCase extends CatalogITCaseBase {
     }
 
     @Test
+    @Timeout(60)
     public void testBranchConsumersTable() throws Exception {
         sql("CREATE TABLE t (a INT, b INT)");
         sql("INSERT INTO t VALUES (1, 2), (3,4)");
@@ -451,11 +453,18 @@ public class BranchSqlITCase extends CatalogITCaseBase {
                                 "SELECT * FROM t$branch_b1 /*+ 
OPTIONS('consumer-id'='id1','consumer.expiration-time'='3h') */"));
         sql("INSERT INTO t$branch_b1 VALUES (5, 6), (7, 8)");
         assertThat(iterator.collect(2)).containsExactlyInAnyOrder(Row.of(5, 
6), Row.of(7, 8));
+        List<String> branchResult;
+        do {
+            branchResult = collectResult("SELECT * FROM 
t$branch_b1$consumers");
+            if (!branchResult.isEmpty()) {
+                break;
+            }
+            Thread.sleep(1000);
+        } while (true);
         iterator.close();
 
         assertThat(collectResult("SELECT * FROM t$consumers")).isEmpty();
-        assertThat(collectResult("SELECT * FROM t$branch_b1$consumers"))
-                .containsExactlyInAnyOrder("+I[id1, 2]");
+        assertThat(branchResult).containsExactlyInAnyOrder("+I[id1, 2]");
         assertThat(collectResult("SELECT * FROM t$consumers /*+ 
OPTIONS('branch'='b1') */"))
                 .containsExactlyInAnyOrder("+I[id1, 2]");
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index 8a3e068a72..2a855796d8 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -33,6 +33,7 @@ import 
org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 import org.apache.flink.types.Row;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 
 import javax.annotation.Nonnull;
 
@@ -940,6 +941,7 @@ public class CatalogTableITCase extends CatalogITCaseBase {
     }
 
     @Test
+    @Timeout(60)
     public void testConsumersTable() throws Exception {
         batchSql("CREATE TABLE T (a INT, b INT)");
         batchSql("INSERT INTO T VALUES (1, 2)");
@@ -952,9 +954,17 @@ public class CatalogTableITCase extends CatalogITCaseBase {
 
         batchSql("INSERT INTO T VALUES (5, 6), (7, 8)");
         assertThat(iterator.collect(2)).containsExactlyInAnyOrder(Row.of(1, 
2), Row.of(3, 4));
+
+        List<Row> result;
+        do {
+            result = sql("SELECT * FROM T$consumers");
+            if (!result.isEmpty()) {
+                break;
+            }
+            Thread.sleep(1000);
+        } while (true);
         iterator.close();
 
-        List<Row> result = sql("SELECT * FROM T$consumers");
         assertThat(result).hasSize(1);
         assertThat(result.get(0).getField(0)).isEqualTo("my1");
         assertThat((Long) result.get(0).getField(1)).isGreaterThanOrEqualTo(3);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index 2e15697511..b448858328 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -120,7 +120,14 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
         assertThat(iterator.collect(2))
                 .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", 
"5", "6"));
 
-        Thread.sleep(1000);
+        List<Row> result;
+        do {
+            result = sql("SELECT * FROM %s$consumers", table);
+            if (!result.isEmpty()) {
+                break;
+            }
+            Thread.sleep(1000);
+        } while (true);
         iterator.close();
 
         iterator =
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
index e2243ddf26..6fb8c81eb7 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
@@ -26,8 +26,11 @@ import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.BlockingIterator;
 
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
@@ -46,6 +49,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 public class ConsumerActionITCase extends ActionITCaseBase {
 
     @ParameterizedTest
+    @Timeout(60)
     @ValueSource(strings = {"action", "procedure_indexed", "procedure_named"})
     public void testResetConsumer(String invoker) throws Exception {
         init(warehouse);
@@ -72,18 +76,22 @@ public class ConsumerActionITCase extends ActionITCaseBase {
         writeData(rowData(3L, BinaryString.fromString("Paimon")));
 
         // use consumer streaming read table
-        testStreamingRead(
+        BlockingIterator<Row, Row> iterator =
+                testStreamingRead(
                         "SELECT * FROM `"
                                 + tableName
                                 + "` /*+ 
OPTIONS('consumer-id'='myid','consumer.expiration-time'='3h') */",
                         Arrays.asList(
                                 changelogRow("+I", 1L, "Hi"),
                                 changelogRow("+I", 2L, "Hello"),
-                                changelogRow("+I", 3L, "Paimon")))
-                .close();
+                                changelogRow("+I", 3L, "Paimon")));
 
-        Thread.sleep(1000);
         ConsumerManager consumerManager = new ConsumerManager(table.fileIO(), 
table.location());
+        while (!consumerManager.consumer("myid").isPresent()) {
+            Thread.sleep(1000);
+        }
+        iterator.close();
+
         Optional<Consumer> consumer1 = consumerManager.consumer("myid");
         assertThat(consumer1).isPresent();
         assertThat(consumer1.get().nextSnapshot()).isEqualTo(4);
@@ -191,6 +199,7 @@ public class ConsumerActionITCase extends ActionITCaseBase {
     }
 
     @ParameterizedTest
+    @Timeout(60)
     @ValueSource(strings = {"action", "procedure_indexed", "procedure_named"})
     public void testResetBranchConsumer(String invoker) throws Exception {
         init(warehouse);
@@ -222,18 +231,23 @@ public class ConsumerActionITCase extends 
ActionITCaseBase {
         String branchTableName = tableName + "$branch_b1";
 
         // use consumer streaming read table
-        testStreamingRead(
+        BlockingIterator<Row, Row> iterator =
+                testStreamingRead(
                         "SELECT * FROM `"
                                 + branchTableName
                                 + "` /*+ 
OPTIONS('consumer-id'='myid','consumer.expiration-time'='3h') */",
                         Arrays.asList(
                                 changelogRow("+I", 1L, "Hi"),
                                 changelogRow("+I", 2L, "Hello"),
-                                changelogRow("+I", 3L, "Paimon")))
-                .close();
+                                changelogRow("+I", 3L, "Paimon")));
 
         ConsumerManager consumerManager =
                 new ConsumerManager(table.fileIO(), table.location(), 
branchName);
+        while (!consumerManager.consumer("myid").isPresent()) {
+            Thread.sleep(1000);
+        }
+        iterator.close();
+
         Optional<Consumer> consumer1 = consumerManager.consumer("myid");
         assertThat(consumer1).isPresent();
         assertThat(consumer1.get().nextSnapshot()).isEqualTo(4);

Reply via email to