This is an automated email from the ASF dual-hosted git repository.
tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7cf71585a60 [FLINK-28626][tests] Fix unstable
RescaleCheckpointManuallyITCase when unaligned checkpoint is enabled
7cf71585a60 is described below
commit 7cf71585a603866822ed0aee3978eea8d8587eee
Author: fredia <[email protected]>
AuthorDate: Tue Aug 2 18:10:27 2022 +0800
[FLINK-28626][tests] Fix unstable RescaleCheckpointManuallyITCase when
unaligned checkpoint is enabled
---
.../RescaleCheckpointManuallyITCase.java | 180 ++++++++++-----------
1 file changed, 86 insertions(+), 94 deletions(-)
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
index 33c0cda6e0e..3419f6c3df7 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
@@ -18,19 +18,18 @@
package org.apache.flink.test.checkpointing;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
@@ -42,29 +41,31 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
-import org.apache.flink.test.util.TestUtils;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.testutils.junit.SharedReference;
import org.apache.flink.util.Collector;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import java.io.File;
-import java.time.Duration;
+import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
+import static
org.apache.flink.runtime.jobgraph.SavepointRestoreSettings.forPath;
+import static
org.apache.flink.runtime.testutils.CommonTestUtils.getLatestCompletedCheckpointPath;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
/**
* Test checkpoint rescaling for incremental rocksdb. The implementations of
@@ -77,19 +78,14 @@ public class RescaleCheckpointManuallyITCase extends
TestLogger {
private static final int SLOTS_PER_TASK_MANAGER = 2;
private static MiniClusterWithClientResource cluster;
- private File checkpointDir;
+ @Rule public final SharedObjects sharedObjects = SharedObjects.create();
@ClassRule public static TemporaryFolder temporaryFolder = new
TemporaryFolder();
@Before
public void setup() throws Exception {
Configuration config = new Configuration();
-
- checkpointDir = temporaryFolder.newFolder();
-
config.setString(StateBackendOptions.STATE_BACKEND, "rocksdb");
- config.setString(
- CheckpointingOptions.CHECKPOINTS_DIRECTORY,
checkpointDir.toURI().toString());
config.setBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
cluster =
@@ -132,15 +128,10 @@ public class RescaleCheckpointManuallyITCase extends
TestLogger {
final int parallelism2 = scaleOut ? 4 : 3;
final int maxParallelism = 13;
- ClusterClient<?> client = cluster.getClusterClient();
+ MiniCluster miniCluster = cluster.getMiniCluster();
String checkpointPath =
runJobAndGetCheckpoint(
- numberKeys,
- numberElements,
- parallelism,
- maxParallelism,
- client,
- checkpointDir);
+ numberKeys, numberElements, parallelism,
maxParallelism, miniCluster);
assertNotNull(checkpointPath);
@@ -150,54 +141,38 @@ public class RescaleCheckpointManuallyITCase extends
TestLogger {
numberKeys,
numberElements2,
numberElements + numberElements2,
- client,
+ miniCluster,
checkpointPath);
}
- private static String runJobAndGetCheckpoint(
+ private String runJobAndGetCheckpoint(
int numberKeys,
int numberElements,
int parallelism,
int maxParallelism,
- ClusterClient<?> client,
- File checkpointDir)
+ MiniCluster miniCluster)
throws Exception {
try {
- Duration timeout = Duration.ofMinutes(5);
- Deadline deadline = Deadline.now().plus(timeout);
-
JobGraph jobGraph =
createJobGraphWithKeyedState(
- parallelism, maxParallelism, numberKeys,
numberElements, false, 100);
- client.submitJob(jobGraph).get();
-
- assertTrue(
- SubtaskIndexFlatMapper.workCompletedLatch.await(
- deadline.timeLeft().toMillis(),
TimeUnit.MILLISECONDS));
-
- // verify the current state
- Set<Tuple2<Integer, Integer>> actualResult =
CollectionSink.getElementsSet();
-
- Set<Tuple2<Integer, Integer>> expectedResult = new HashSet<>();
-
- for (int key = 0; key < numberKeys; key++) {
- int keyGroupIndex =
KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism);
- expectedResult.add(
- Tuple2.of(
-
KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
- maxParallelism, parallelism,
keyGroupIndex),
- numberElements * key));
- }
-
- assertEquals(expectedResult, actualResult);
-
- // ensure contents of state within SubtaskIndexFlatMapper are all
included
- // in the last checkpoint (refer to FLINK-26882 for more details).
-
cluster.getMiniCluster().triggerCheckpoint(jobGraph.getJobID()).get();
-
- client.cancel(jobGraph.getJobID()).get();
- TestUtils.waitUntilJobCanceled(jobGraph.getJobID(), client);
- return
TestUtils.getMostRecentCompletedCheckpoint(checkpointDir).getAbsolutePath();
+ parallelism,
+ maxParallelism,
+ numberKeys,
+ numberElements,
+ numberElements,
+ true,
+ 100,
+ miniCluster);
+ miniCluster.submitJob(jobGraph).get();
+ miniCluster.requestJobResult(jobGraph.getJobID()).get();
+ // The elements may not all be sent to sink when unaligned
checkpoints enabled(refer to
+ // FLINK-26882 for more details).
+ // Don't verify current state here.
+ return getLatestCompletedCheckpointPath(jobGraph.getJobID(),
miniCluster)
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ "Cannot get completed checkpoint,
job failed before completing checkpoint"));
} finally {
CollectionSink.clearElementsSet();
}
@@ -209,83 +184,101 @@ public class RescaleCheckpointManuallyITCase extends
TestLogger {
int numberKeys,
int numberElements,
int numberElementsExpect,
- ClusterClient<?> client,
+ MiniCluster miniCluster,
String restorePath)
throws Exception {
try {
-
JobGraph scaledJobGraph =
createJobGraphWithKeyedState(
restoreParallelism,
maxParallelism,
numberKeys,
numberElements,
- true,
- 100);
+ numberElementsExpect,
+ false,
+ 100,
+ miniCluster);
- scaledJobGraph.setSavepointRestoreSettings(
- SavepointRestoreSettings.forPath(restorePath));
+ scaledJobGraph.setSavepointRestoreSettings(forPath(restorePath));
- submitJobAndWaitForResult(client, scaledJobGraph,
getClass().getClassLoader());
+ miniCluster.submitJob(scaledJobGraph).get();
+ miniCluster.requestJobResult(scaledJobGraph.getJobID()).get();
- Set<Tuple2<Integer, Integer>> actualResult2 =
CollectionSink.getElementsSet();
+ Set<Tuple2<Integer, Integer>> actualResult =
CollectionSink.getElementsSet();
- Set<Tuple2<Integer, Integer>> expectedResult2 = new HashSet<>();
+ Set<Tuple2<Integer, Integer>> expectedResult = new HashSet<>();
for (int key = 0; key < numberKeys; key++) {
int keyGroupIndex =
KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism);
- expectedResult2.add(
+ expectedResult.add(
Tuple2.of(
KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
maxParallelism, restoreParallelism,
keyGroupIndex),
key * numberElementsExpect));
}
- assertEquals(expectedResult2, actualResult2);
+ assertEquals(expectedResult, actualResult);
} finally {
CollectionSink.clearElementsSet();
}
}
- private static JobGraph createJobGraphWithKeyedState(
+ private JobGraph createJobGraphWithKeyedState(
int parallelism,
int maxParallelism,
int numberKeys,
int numberElements,
- boolean terminateAfterEmission,
- int checkpointingInterval) {
+ int numberElementsExpect,
+ boolean failAfterEmission,
+ int checkpointingInterval,
+ MiniCluster miniCluster)
+ throws IOException {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
if (0 < maxParallelism) {
env.getConfig().setMaxParallelism(maxParallelism);
}
env.enableCheckpointing(checkpointingInterval);
+
env.getCheckpointConfig().setCheckpointStorage(temporaryFolder.newFolder().toURI());
env.getCheckpointConfig()
.setExternalizedCheckpointCleanup(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setRestartStrategy(RestartStrategies.noRestart());
env.getConfig().setUseSnapshotCompression(true);
+ SharedReference<JobID> jobID = sharedObjects.add(new JobID());
+ SharedReference<MiniCluster> miniClusterRef =
sharedObjects.add(miniCluster);
DataStream<Integer> input =
env.addSource(
new NotifyingDefiniteKeySource(
- numberKeys, numberElements,
terminateAfterEmission))
+ numberKeys, numberElements,
failAfterEmission) {
+ @Override
+ public void waitCheckpointCompleted()
throws Exception {
+ Optional<String>
mostRecentCompletedCheckpointPath =
+
getLatestCompletedCheckpointPath(
+ jobID.get(),
miniClusterRef.get());
+ while
(!mostRecentCompletedCheckpointPath.isPresent()) {
+ Thread.sleep(50);
+ mostRecentCompletedCheckpointPath =
+
getLatestCompletedCheckpointPath(
+ jobID.get(),
miniClusterRef.get());
+ }
+ }
+ })
.keyBy(
new KeySelector<Integer, Integer>() {
private static final long serialVersionUID
= 1L;
@Override
- public Integer getKey(Integer value)
throws Exception {
+ public Integer getKey(Integer value) {
return value;
}
});
- SubtaskIndexFlatMapper.workCompletedLatch = new
CountDownLatch(numberKeys);
-
DataStream<Tuple2<Integer, Integer>> result =
- input.flatMap(new SubtaskIndexFlatMapper(numberElements));
+ input.flatMap(new
SubtaskIndexFlatMapper(numberElementsExpect));
result.addSink(new CollectionSink<>());
- return env.getStreamGraph().getJobGraph();
+ return env.getStreamGraph().getJobGraph(jobID.get());
}
private static class NotifyingDefiniteKeySource extends
RichParallelSourceFunction<Integer> {
@@ -293,25 +286,25 @@ public class RescaleCheckpointManuallyITCase extends
TestLogger {
private static final long serialVersionUID = 1L;
private final int numberKeys;
- private final int numberElements;
- private final boolean terminateAfterEmission;
-
+ protected final int numberElements;
+ private final boolean failAfterEmission;
protected int counter = 0;
-
private boolean running = true;
public NotifyingDefiniteKeySource(
- int numberKeys, int numberElements, boolean
terminateAfterEmission) {
+ int numberKeys, int numberElements, boolean failAfterEmission)
{
+ Preconditions.checkState(numberElements > 0);
this.numberKeys = numberKeys;
this.numberElements = numberElements;
- this.terminateAfterEmission = terminateAfterEmission;
+ this.failAfterEmission = failAfterEmission;
}
+ public void waitCheckpointCompleted() throws Exception {}
+
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
final int subtaskIndex =
getRuntimeContext().getIndexOfThisSubtask();
while (running) {
-
if (counter < numberElements) {
synchronized (ctx.getCheckpointLock()) {
for (int value = subtaskIndex;
@@ -322,10 +315,12 @@ public class RescaleCheckpointManuallyITCase extends
TestLogger {
counter++;
}
} else {
- if (terminateAfterEmission) {
- running = false;
+ waitCheckpointCompleted();
+ if (failAfterEmission) {
+ throw new FlinkRuntimeException(
+ "Make job fail artificially, to retain
completed checkpoint.");
} else {
- Thread.sleep(100);
+ running = false;
}
}
}
@@ -343,8 +338,6 @@ public class RescaleCheckpointManuallyITCase extends
TestLogger {
private static final long serialVersionUID = 1L;
- public static CountDownLatch workCompletedLatch = new
CountDownLatch(1);
-
private transient ValueState<Integer> counter;
private transient ValueState<Integer> sum;
@@ -365,9 +358,8 @@ public class RescaleCheckpointManuallyITCase extends
TestLogger {
int s = sumValue == null ? value : sumValue + value;
sum.update(s);
- if (count % numberElements == 0) {
+ if (count == numberElements) {
out.collect(Tuple2.of(getRuntimeContext().getIndexOfThisSubtask(), s));
- workCompletedLatch.countDown();
}
}