This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch release-0.3
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/release-0.3 by this push:
new ad164c32 [FLINK-30603] Fix unstable test
CompactActionITCase#testStreamingCompact
ad164c32 is described below
commit ad164c32fca33c365192b2025eaea61980d961eb
Author: tsreaper <[email protected]>
AuthorDate: Tue Jan 10 15:34:17 2023 +0800
[FLINK-30603] Fix unstable test CompactActionITCase#testStreamingCompact
This closes #473.
(cherry picked from commit b7188bcc46989c66e44f1fb04cd45972e1a6fe50)
---
.../store/connector/action/CompactActionITCase.java | 21 ++++++++++++++++-----
1 file changed, 16 insertions(+), 5 deletions(-)
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java
index 643880c5..7160529e 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java
@@ -60,6 +60,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
/** IT cases for {@link CompactAction}. */
public class CompactActionITCase extends AbstractTestBase {
@@ -112,6 +113,7 @@ public class CompactActionITCase extends AbstractTestBase {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ env.setParallelism(ThreadLocalRandom.current().nextInt(4) + 1);
new
CompactAction(tablePath).withPartitions(getSpecifiedPartitions()).build(env);
env.execute();
@@ -169,6 +171,7 @@ public class CompactActionITCase extends AbstractTestBase {
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointInterval(500);
+ env.setParallelism(ThreadLocalRandom.current().nextInt(4) + 1);
new
CompactAction(tablePath).withPartitions(getSpecifiedPartitions()).build(env);
env.executeAsync();
@@ -181,8 +184,11 @@ public class CompactActionITCase extends AbstractTestBase {
}
// first full compaction
- Assert.assertEquals(2, (long) plan.snapshotId);
- List<String> actual = getResult(table.newRead(), plan.splits());
+ List<String> actual = new ArrayList<>();
+ while (plan != null) {
+ actual.addAll(getResult(table.newRead(), plan.splits()));
+ plan = snapshotEnumerator.enumerate();
+ }
actual.sort(String::compareTo);
Assert.assertEquals(Arrays.asList("+I 1|100|15|20221208", "+I
1|100|15|20221209"), actual);
@@ -204,8 +210,11 @@ public class CompactActionITCase extends AbstractTestBase {
}
// second full compaction
- Assert.assertEquals(4, (long) plan.snapshotId);
- actual = getResult(table.newRead(), plan.splits());
+ actual = new ArrayList<>();
+ while (plan != null) {
+ actual.addAll(getResult(table.newRead(), plan.splits()));
+ plan = snapshotEnumerator.enumerate();
+ }
actual.sort(String::compareTo);
Assert.assertEquals(
Arrays.asList(
@@ -216,7 +225,9 @@ public class CompactActionITCase extends AbstractTestBase {
actual);
// assert dedicated compact job will expire snapshots
- Assert.assertEquals(2L, (long) snapshotManager.earliestSnapshotId());
+ Assert.assertEquals(
+ snapshotManager.latestSnapshotId() - 2,
+ (long) snapshotManager.earliestSnapshotId());
}
private List<Map<String, String>> getSpecifiedPartitions() {