This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c77a58e5a0 [flink] Fix flaky ConsumerActionITCase (#8086)
c77a58e5a0 is described below

commit c77a58e5a0856cc795d877c06977ef5d9e6e9142
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Jun 3 12:12:58 2026 +0800

    [flink] Fix flaky ConsumerActionITCase (#8086)
---
 .../paimon/flink/action/ConsumerActionITCase.java  | 86 ++--------------------
 1 file changed, 5 insertions(+), 81 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
index 86358055f4..017e496685 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
@@ -26,11 +26,9 @@ import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.BlockingIterator;
 
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.table.api.TableException;
-import org.apache.flink.types.Row;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
@@ -40,9 +38,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 
-import static 
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
 import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
-import static 
org.apache.paimon.flink.util.ReadWriteTableTestUtil.testStreamingRead;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
@@ -76,22 +72,8 @@ public class ConsumerActionITCase extends ActionITCaseBase {
         writeData(rowData(2L, BinaryString.fromString("Hello")));
         writeData(rowData(3L, BinaryString.fromString("Paimon")));
 
-        // use consumer streaming read table
-        BlockingIterator<Row, Row> iterator =
-                testStreamingRead(
-                        "SELECT * FROM `"
-                                + tableName
-                                + "` /*+ 
OPTIONS('consumer-id'='myid','consumer.expiration-time'='3h') */",
-                        Arrays.asList(
-                                changelogRow("+I", 1L, "Hi"),
-                                changelogRow("+I", 2L, "Hello"),
-                                changelogRow("+I", 3L, "Paimon")));
-
         ConsumerManager consumerManager = new ConsumerManager(table.fileIO(), 
table.location());
-        while (!consumerManager.consumer("myid").isPresent()) {
-            Thread.sleep(1000);
-        }
-        iterator.close();
+        consumerManager.resetConsumer("myid", new Consumer(4));
 
         Optional<Consumer> consumer1 = consumerManager.consumer("myid");
         assertThat(consumer1).isPresent();
@@ -242,23 +224,9 @@ public class ConsumerActionITCase extends ActionITCaseBase 
{
         table.createBranch("b1", "tag");
         String branchTableName = tableName + "$branch_b1";
 
-        // use consumer streaming read table
-        BlockingIterator<Row, Row> iterator =
-                testStreamingRead(
-                        "SELECT * FROM `"
-                                + branchTableName
-                                + "` /*+ 
OPTIONS('consumer-id'='myid','consumer.expiration-time'='3h') */",
-                        Arrays.asList(
-                                changelogRow("+I", 1L, "Hi"),
-                                changelogRow("+I", 2L, "Hello"),
-                                changelogRow("+I", 3L, "Paimon")));
-
         ConsumerManager consumerManager =
                 new ConsumerManager(table.fileIO(), table.location(), 
branchName);
-        while (!consumerManager.consumer("myid").isPresent()) {
-            Thread.sleep(1000);
-        }
-        iterator.close();
+        consumerManager.resetConsumer("myid", new Consumer(4));
 
         Optional<Consumer> consumer1 = consumerManager.consumer("myid");
         assertThat(consumer1).isPresent();
@@ -356,54 +324,10 @@ public class ConsumerActionITCase extends 
ActionITCaseBase {
         writeData(rowData(2L, BinaryString.fromString("Hello")));
         writeData(rowData(3L, BinaryString.fromString("Paimon")));
 
-        // use consumer streaming read table
-        BlockingIterator<Row, Row> iterator1 =
-                testStreamingRead(
-                        "SELECT * FROM `"
-                                + tableName
-                                + "` /*+ 
OPTIONS('consumer-id'='myid1_1','consumer.expiration-time'='3h') */",
-                        Arrays.asList(
-                                changelogRow("+I", 1L, "Hi"),
-                                changelogRow("+I", 2L, "Hello"),
-                                changelogRow("+I", 3L, "Paimon")));
-
         ConsumerManager consumerManager = new ConsumerManager(table.fileIO(), 
table.location());
-        while (!consumerManager.consumer("myid1_1").isPresent()) {
-            Thread.sleep(1000);
-        }
-        iterator1.close();
-
-        // use consumer streaming read table
-        BlockingIterator<Row, Row> iterator2 =
-                testStreamingRead(
-                        "SELECT * FROM `"
-                                + tableName
-                                + "` /*+ 
OPTIONS('consumer-id'='myid1_2','consumer.expiration-time'='3h') */",
-                        Arrays.asList(
-                                changelogRow("+I", 1L, "Hi"),
-                                changelogRow("+I", 2L, "Hello"),
-                                changelogRow("+I", 3L, "Paimon")));
-
-        while (!consumerManager.consumer("myid1_2").isPresent()) {
-            Thread.sleep(1000);
-        }
-        iterator2.close();
-
-        // use consumer streaming read table
-        BlockingIterator<Row, Row> iterator3 =
-                testStreamingRead(
-                        "SELECT * FROM `"
-                                + tableName
-                                + "` /*+ 
OPTIONS('consumer-id'='myid2','consumer.expiration-time'='3h') */",
-                        Arrays.asList(
-                                changelogRow("+I", 1L, "Hi"),
-                                changelogRow("+I", 2L, "Hello"),
-                                changelogRow("+I", 3L, "Paimon")));
-
-        while (!consumerManager.consumer("myid2").isPresent()) {
-            Thread.sleep(1000);
-        }
-        iterator3.close();
+        consumerManager.resetConsumer("myid1_1", new Consumer(4));
+        consumerManager.resetConsumer("myid1_2", new Consumer(4));
+        consumerManager.resetConsumer("myid2", new Consumer(4));
 
         Optional<Consumer> consumer1 = consumerManager.consumer("myid1_1");
         Optional<Consumer> consumer2 = consumerManager.consumer("myid1_2");

Reply via email to