This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c77a58e5a0 [flink] Fix flaky ConsumerActionITCase (#8086)
c77a58e5a0 is described below
commit c77a58e5a0856cc795d877c06977ef5d9e6e9142
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Jun 3 12:12:58 2026 +0800
[flink] Fix flaky ConsumerActionITCase (#8086)
---
.../paimon/flink/action/ConsumerActionITCase.java | 86 ++--------------------
1 file changed, 5 insertions(+), 81 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
index 86358055f4..017e496685 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
@@ -26,11 +26,9 @@ import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.BlockingIterator;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.table.api.TableException;
-import org.apache.flink.types.Row;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -40,9 +38,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Optional;
-import static
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
-import static
org.apache.paimon.flink.util.ReadWriteTableTestUtil.testStreamingRead;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -76,22 +72,8 @@ public class ConsumerActionITCase extends ActionITCaseBase {
writeData(rowData(2L, BinaryString.fromString("Hello")));
writeData(rowData(3L, BinaryString.fromString("Paimon")));
- // use consumer streaming read table
- BlockingIterator<Row, Row> iterator =
- testStreamingRead(
- "SELECT * FROM `"
- + tableName
- + "` /*+
OPTIONS('consumer-id'='myid','consumer.expiration-time'='3h') */",
- Arrays.asList(
- changelogRow("+I", 1L, "Hi"),
- changelogRow("+I", 2L, "Hello"),
- changelogRow("+I", 3L, "Paimon")));
-
ConsumerManager consumerManager = new ConsumerManager(table.fileIO(),
table.location());
- while (!consumerManager.consumer("myid").isPresent()) {
- Thread.sleep(1000);
- }
- iterator.close();
+ consumerManager.resetConsumer("myid", new Consumer(4));
Optional<Consumer> consumer1 = consumerManager.consumer("myid");
assertThat(consumer1).isPresent();
@@ -242,23 +224,9 @@ public class ConsumerActionITCase extends ActionITCaseBase
{
table.createBranch("b1", "tag");
String branchTableName = tableName + "$branch_b1";
- // use consumer streaming read table
- BlockingIterator<Row, Row> iterator =
- testStreamingRead(
- "SELECT * FROM `"
- + branchTableName
- + "` /*+
OPTIONS('consumer-id'='myid','consumer.expiration-time'='3h') */",
- Arrays.asList(
- changelogRow("+I", 1L, "Hi"),
- changelogRow("+I", 2L, "Hello"),
- changelogRow("+I", 3L, "Paimon")));
-
ConsumerManager consumerManager =
new ConsumerManager(table.fileIO(), table.location(),
branchName);
- while (!consumerManager.consumer("myid").isPresent()) {
- Thread.sleep(1000);
- }
- iterator.close();
+ consumerManager.resetConsumer("myid", new Consumer(4));
Optional<Consumer> consumer1 = consumerManager.consumer("myid");
assertThat(consumer1).isPresent();
@@ -356,54 +324,10 @@ public class ConsumerActionITCase extends
ActionITCaseBase {
writeData(rowData(2L, BinaryString.fromString("Hello")));
writeData(rowData(3L, BinaryString.fromString("Paimon")));
- // use consumer streaming read table
- BlockingIterator<Row, Row> iterator1 =
- testStreamingRead(
- "SELECT * FROM `"
- + tableName
- + "` /*+
OPTIONS('consumer-id'='myid1_1','consumer.expiration-time'='3h') */",
- Arrays.asList(
- changelogRow("+I", 1L, "Hi"),
- changelogRow("+I", 2L, "Hello"),
- changelogRow("+I", 3L, "Paimon")));
-
ConsumerManager consumerManager = new ConsumerManager(table.fileIO(),
table.location());
- while (!consumerManager.consumer("myid1_1").isPresent()) {
- Thread.sleep(1000);
- }
- iterator1.close();
-
- // use consumer streaming read table
- BlockingIterator<Row, Row> iterator2 =
- testStreamingRead(
- "SELECT * FROM `"
- + tableName
- + "` /*+
OPTIONS('consumer-id'='myid1_2','consumer.expiration-time'='3h') */",
- Arrays.asList(
- changelogRow("+I", 1L, "Hi"),
- changelogRow("+I", 2L, "Hello"),
- changelogRow("+I", 3L, "Paimon")));
-
- while (!consumerManager.consumer("myid1_2").isPresent()) {
- Thread.sleep(1000);
- }
- iterator2.close();
-
- // use consumer streaming read table
- BlockingIterator<Row, Row> iterator3 =
- testStreamingRead(
- "SELECT * FROM `"
- + tableName
- + "` /*+
OPTIONS('consumer-id'='myid2','consumer.expiration-time'='3h') */",
- Arrays.asList(
- changelogRow("+I", 1L, "Hi"),
- changelogRow("+I", 2L, "Hello"),
- changelogRow("+I", 3L, "Paimon")));
-
- while (!consumerManager.consumer("myid2").isPresent()) {
- Thread.sleep(1000);
- }
- iterator3.close();
+ consumerManager.resetConsumer("myid1_1", new Consumer(4));
+ consumerManager.resetConsumer("myid1_2", new Consumer(4));
+ consumerManager.resetConsumer("myid2", new Consumer(4));
Optional<Consumer> consumer1 = consumerManager.consumer("myid1_1");
Optional<Consumer> consumer2 = consumerManager.consumer("myid1_2");