This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 749535769a [tests] Fix @Timeout annotation not working in
PrimaryKeyFileStoreTableITCase (#4634)
749535769a is described below
commit 749535769aa730cbc6f474c59f47c0877f35cbc0
Author: tsreaper <[email protected]>
AuthorDate: Wed Dec 4 15:44:47 2024 +0800
[tests] Fix @Timeout annotation not working in
PrimaryKeyFileStoreTableITCase (#4634)
---
.../flink/PrimaryKeyFileStoreTableITCase.java | 76 +++++++++++++++-------
1 file changed, 52 insertions(+), 24 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
index 3fa95edb86..4ee539c4fd 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
@@ -59,6 +59,8 @@ import static org.assertj.core.api.Assertions.assertThatCode;
/** Tests for changelog table with primary keys. */
public class PrimaryKeyFileStoreTableITCase extends AbstractTestBase {
+ private static final int TIMEOUT = 180;
+
// ------------------------------------------------------------------------
// Test Utilities
// ------------------------------------------------------------------------
@@ -95,12 +97,38 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
catalogName, warehouse, defaultPropertyString);
}
+ private CloseableIterator<Row> collect(TableResult result) {
+ return collect(result, TIMEOUT);
+ }
+
+ private CloseableIterator<Row> collect(TableResult result, int timeout) {
+ JobClient client = result.getJobClient().get();
+ Thread timeoutThread =
+ new Thread(
+ () -> {
+ for (int i = 0; i < timeout; i++) {
+ try {
+ Thread.sleep(1000);
+ if
(client.getJobStatus().get().isGloballyTerminalState()) {
+ return;
+ }
+ } catch (Exception e) {
+ client.cancel();
+ throw new RuntimeException(e);
+ }
+ }
+ client.cancel();
+ });
+ timeoutThread.start();
+ return result.collect();
+ }
+
// ------------------------------------------------------------------------
// Constructed Tests
// ------------------------------------------------------------------------
@Test
- @Timeout(180)
+ @Timeout(TIMEOUT)
public void testFullCompactionTriggerInterval() throws Exception {
innerTestChangelogProducing(
Arrays.asList(
@@ -109,7 +137,7 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
}
@Test
- @Timeout(180)
+ @Timeout(TIMEOUT)
public void testFullCompactionWithLongCheckpointInterval() throws
Exception {
// create table
TableEnvironment bEnv =
tableEnvironmentBuilder().batchMode().parallelism(1).build();
@@ -135,7 +163,7 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
.build();
sEnv.executeSql(createCatalogSql("testCatalog", path));
sEnv.executeSql("USE CATALOG testCatalog");
- CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM
T").collect();
+ CloseableIterator<Row> it = collect(sEnv.executeSql("SELECT * FROM
T"));
// run compact job
StreamExecutionEnvironment env =
@@ -168,7 +196,7 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
}
@Test
- @Timeout(180)
+ @Timeout(TIMEOUT)
public void testLookupChangelog() throws Exception {
innerTestChangelogProducing(Collections.singletonList("'changelog-producer' =
'lookup'"));
}
@@ -190,7 +218,7 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
+ "'bucket' = '2'"
+ ")");
- CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM
T2").collect();
+ CloseableIterator<Row> it = collect(sEnv.executeSql("SELECT * FROM
T2"));
// insert data
sEnv.executeSql("INSERT INTO T2 VALUES (1, 'A')").await();
@@ -213,7 +241,7 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
sEnv.executeSql("ALTER TABLE T2 SET
('changelog-producer'='full-compaction')");
CloseableIterator<Row> branchIt =
- sEnv.executeSql("select * from T2 /*+ OPTIONS('branch' =
'branch1') */").collect();
+ collect(sEnv.executeSql("select * from T2 /*+ OPTIONS('branch'
= 'branch1') */"));
// insert data to branch
sEnv.executeSql(
"INSERT INTO T2/*+ OPTIONS('branch' = 'branch1') */
VALUES (10, 'v10'),(11, 'v11'),(12, 'v12')")
@@ -261,7 +289,7 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
sEnv.executeSql(
"INSERT INTO T SELECT SUM(i) AS k, g AS v FROM
`default_catalog`.`default_database`.`S` GROUP BY g");
- CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM
T").collect();
+ CloseableIterator<Row> it = collect(sEnv.executeSql("SELECT * FROM
T"));
// write initial data
sEnv.executeSql(
@@ -329,7 +357,7 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
result1.await();
result2.await();
- try (CloseableIterator<Row> it = tEnv.executeSql("SELECT * FROM
t").collect()) {
+ try (CloseableIterator<Row> it = collect(tEnv.executeSql("SELECT *
FROM t"))) {
for (int i = 0; i < 3; i++) {
assertThat(it).hasNext();
Row row = it.next();
@@ -338,7 +366,7 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
}
}
- @Timeout(60)
+ @Timeout(TIMEOUT)
@ParameterizedTest()
@ValueSource(booleans = {false, true})
public void testRecreateTableWithException(boolean isReloadData) throws
Exception {
@@ -361,7 +389,7 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
.build();
sEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse"));
sEnv.executeSql("USE CATALOG testCatalog");
- CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM
t").collect();
+ CloseableIterator<Row> it = collect(sEnv.executeSql("SELECT * FROM
t"));
// first write
List<String> values = new ArrayList<>();
@@ -414,7 +442,7 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
}
@Test
- @Timeout(120)
+ @Timeout(TIMEOUT)
public void testChangelogCompactInBatchWrite() throws Exception {
TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build();
String catalogDdl =
@@ -504,7 +532,7 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
}
@Test
- @Timeout(120)
+ @Timeout(TIMEOUT)
public void testChangelogCompactInStreamWrite() throws Exception {
TableEnvironment sEnv =
tableEnvironmentBuilder()
@@ -533,7 +561,7 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
+ "', 'source.monitor-interval' = '500ms' )");
sEnv.executeSql("INSERT INTO t SELECT * FROM
`default_catalog`.`default_database`.`s`");
- CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM
t").collect();
+ CloseableIterator<Row> it = collect(sEnv.executeSql("SELECT * FROM
t"));
// write initial data
List<String> values = new ArrayList<>();
@@ -589,7 +617,7 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
private void assertStreamingResult(TableResult result, List<Row> expected)
throws Exception {
List<Row> actual = new ArrayList<>();
- try (CloseableIterator<Row> it = result.collect()) {
+ try (CloseableIterator<Row> it = collect(result)) {
while (actual.size() < expected.size() && it.hasNext()) {
actual.add(it.next());
}
@@ -611,14 +639,14 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
// ------------------------------------------------------------------------
@Test
- @Timeout(180)
+ @Timeout(TIMEOUT)
public void testNoChangelogProducerBatchRandom() throws Exception {
TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build();
testNoChangelogProducerRandom(bEnv, 1, false);
}
@Test
- @Timeout(180)
+ @Timeout(TIMEOUT)
public void testNoChangelogProducerStreamingRandom() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
TableEnvironment sEnv =
@@ -631,14 +659,14 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
}
@Test
- @Timeout(180)
+ @Timeout(TIMEOUT)
public void testFullCompactionChangelogProducerBatchRandom() throws
Exception {
TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build();
testFullCompactionChangelogProducerRandom(bEnv, 1, false);
}
@Test
- @Timeout(180)
+ @Timeout(TIMEOUT)
public void testFullCompactionChangelogProducerStreamingRandom() throws
Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
TableEnvironment sEnv =
@@ -651,7 +679,7 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
}
@Test
- @Timeout(180)
+ @Timeout(TIMEOUT)
public void testStandAloneFullCompactJobRandom() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
TableEnvironment sEnv =
@@ -664,14 +692,14 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
}
@Test
- @Timeout(180)
+ @Timeout(TIMEOUT)
public void testLookupChangelogProducerBatchRandom() throws Exception {
TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build();
testLookupChangelogProducerRandom(bEnv, 1, false);
}
@Test
- @Timeout(180)
+ @Timeout(TIMEOUT)
public void testLookupChangelogProducerStreamingRandom() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
TableEnvironment sEnv =
@@ -684,7 +712,7 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
}
@Test
- @Timeout(180)
+ @Timeout(TIMEOUT)
public void testStandAloneLookupJobRandom() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
TableEnvironment sEnv =
@@ -868,7 +896,7 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
ResultChecker checker = new ResultChecker();
int endCnt = 0;
- try (CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM
T").collect()) {
+ try (CloseableIterator<Row> it = collect(sEnv.executeSql("SELECT *
FROM T"))) {
while (it.hasNext()) {
Row row = it.next();
checker.addChangelog(row);
@@ -986,7 +1014,7 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
bEnv.executeSql("USE CATALOG testCatalog");
ResultChecker checker = new ResultChecker();
- try (CloseableIterator<Row> it = bEnv.executeSql("SELECT * FROM
T").collect()) {
+ try (CloseableIterator<Row> it = collect(bEnv.executeSql("SELECT *
FROM T"))) {
while (it.hasNext()) {
checker.addChangelog(it.next());
}