This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7cf71585a60 [FLINK-28626][tests] Fix unstable 
RescaleCheckpointManuallyITCase when unaligned checkpoint is enabled
7cf71585a60 is described below

commit 7cf71585a603866822ed0aee3978eea8d8587eee
Author: fredia <[email protected]>
AuthorDate: Tue Aug 2 18:10:27 2022 +0800

    [FLINK-28626][tests] Fix unstable RescaleCheckpointManuallyITCase when 
unaligned checkpoint is enabled
---
 .../RescaleCheckpointManuallyITCase.java           | 180 ++++++++++-----------
 1 file changed, 86 insertions(+), 94 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
index 33c0cda6e0e..3419f6c3df7 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
@@ -18,19 +18,18 @@
 
 package org.apache.flink.test.checkpointing;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.StateBackendOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
@@ -42,29 +41,31 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
-import org.apache.flink.test.util.TestUtils;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.testutils.junit.SharedReference;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import java.io.File;
-import java.time.Duration;
+import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 
-import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
+import static 
org.apache.flink.runtime.jobgraph.SavepointRestoreSettings.forPath;
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.getLatestCompletedCheckpointPath;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 
 /**
  * Test checkpoint rescaling for incremental rocksdb. The implementations of
@@ -77,19 +78,14 @@ public class RescaleCheckpointManuallyITCase extends 
TestLogger {
     private static final int SLOTS_PER_TASK_MANAGER = 2;
 
     private static MiniClusterWithClientResource cluster;
-    private File checkpointDir;
+    @Rule public final SharedObjects sharedObjects = SharedObjects.create();
 
     @ClassRule public static TemporaryFolder temporaryFolder = new 
TemporaryFolder();
 
     @Before
     public void setup() throws Exception {
         Configuration config = new Configuration();
-
-        checkpointDir = temporaryFolder.newFolder();
-
         config.setString(StateBackendOptions.STATE_BACKEND, "rocksdb");
-        config.setString(
-                CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir.toURI().toString());
         config.setBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
 
         cluster =
@@ -132,15 +128,10 @@ public class RescaleCheckpointManuallyITCase extends 
TestLogger {
         final int parallelism2 = scaleOut ? 4 : 3;
         final int maxParallelism = 13;
 
-        ClusterClient<?> client = cluster.getClusterClient();
+        MiniCluster miniCluster = cluster.getMiniCluster();
         String checkpointPath =
                 runJobAndGetCheckpoint(
-                        numberKeys,
-                        numberElements,
-                        parallelism,
-                        maxParallelism,
-                        client,
-                        checkpointDir);
+                        numberKeys, numberElements, parallelism, 
maxParallelism, miniCluster);
 
         assertNotNull(checkpointPath);
 
@@ -150,54 +141,38 @@ public class RescaleCheckpointManuallyITCase extends 
TestLogger {
                 numberKeys,
                 numberElements2,
                 numberElements + numberElements2,
-                client,
+                miniCluster,
                 checkpointPath);
     }
 
-    private static String runJobAndGetCheckpoint(
+    private String runJobAndGetCheckpoint(
             int numberKeys,
             int numberElements,
             int parallelism,
             int maxParallelism,
-            ClusterClient<?> client,
-            File checkpointDir)
+            MiniCluster miniCluster)
             throws Exception {
         try {
-            Duration timeout = Duration.ofMinutes(5);
-            Deadline deadline = Deadline.now().plus(timeout);
-
             JobGraph jobGraph =
                     createJobGraphWithKeyedState(
-                            parallelism, maxParallelism, numberKeys, 
numberElements, false, 100);
-            client.submitJob(jobGraph).get();
-
-            assertTrue(
-                    SubtaskIndexFlatMapper.workCompletedLatch.await(
-                            deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS));
-
-            // verify the current state
-            Set<Tuple2<Integer, Integer>> actualResult = 
CollectionSink.getElementsSet();
-
-            Set<Tuple2<Integer, Integer>> expectedResult = new HashSet<>();
-
-            for (int key = 0; key < numberKeys; key++) {
-                int keyGroupIndex = 
KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism);
-                expectedResult.add(
-                        Tuple2.of(
-                                
KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
-                                        maxParallelism, parallelism, 
keyGroupIndex),
-                                numberElements * key));
-            }
-
-            assertEquals(expectedResult, actualResult);
-
-            // ensure contents of state within SubtaskIndexFlatMapper are all 
included
-            // in the last checkpoint (refer to FLINK-26882 for more details).
-            
cluster.getMiniCluster().triggerCheckpoint(jobGraph.getJobID()).get();
-
-            client.cancel(jobGraph.getJobID()).get();
-            TestUtils.waitUntilJobCanceled(jobGraph.getJobID(), client);
-            return 
TestUtils.getMostRecentCompletedCheckpoint(checkpointDir).getAbsolutePath();
+                            parallelism,
+                            maxParallelism,
+                            numberKeys,
+                            numberElements,
+                            numberElements,
+                            true,
+                            100,
+                            miniCluster);
+            miniCluster.submitJob(jobGraph).get();
+            miniCluster.requestJobResult(jobGraph.getJobID()).get();
+            // The elements may not all be sent to sink when unaligned 
checkpoints enabled(refer to
+            // FLINK-26882 for more details).
+            // Don't verify current state here.
+            return getLatestCompletedCheckpointPath(jobGraph.getJobID(), 
miniCluster)
+                    .orElseThrow(
+                            () ->
+                                    new IllegalStateException(
+                                            "Cannot get completed checkpoint, 
job failed before completing checkpoint"));
         } finally {
             CollectionSink.clearElementsSet();
         }
@@ -209,83 +184,101 @@ public class RescaleCheckpointManuallyITCase extends 
TestLogger {
             int numberKeys,
             int numberElements,
             int numberElementsExpect,
-            ClusterClient<?> client,
+            MiniCluster miniCluster,
             String restorePath)
             throws Exception {
         try {
-
             JobGraph scaledJobGraph =
                     createJobGraphWithKeyedState(
                             restoreParallelism,
                             maxParallelism,
                             numberKeys,
                             numberElements,
-                            true,
-                            100);
+                            numberElementsExpect,
+                            false,
+                            100,
+                            miniCluster);
 
-            scaledJobGraph.setSavepointRestoreSettings(
-                    SavepointRestoreSettings.forPath(restorePath));
+            scaledJobGraph.setSavepointRestoreSettings(forPath(restorePath));
 
-            submitJobAndWaitForResult(client, scaledJobGraph, 
getClass().getClassLoader());
+            miniCluster.submitJob(scaledJobGraph).get();
+            miniCluster.requestJobResult(scaledJobGraph.getJobID()).get();
 
-            Set<Tuple2<Integer, Integer>> actualResult2 = 
CollectionSink.getElementsSet();
+            Set<Tuple2<Integer, Integer>> actualResult = 
CollectionSink.getElementsSet();
 
-            Set<Tuple2<Integer, Integer>> expectedResult2 = new HashSet<>();
+            Set<Tuple2<Integer, Integer>> expectedResult = new HashSet<>();
 
             for (int key = 0; key < numberKeys; key++) {
                 int keyGroupIndex = 
KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism);
-                expectedResult2.add(
+                expectedResult.add(
                         Tuple2.of(
                                 
KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
                                         maxParallelism, restoreParallelism, 
keyGroupIndex),
                                 key * numberElementsExpect));
             }
-            assertEquals(expectedResult2, actualResult2);
+            assertEquals(expectedResult, actualResult);
         } finally {
             CollectionSink.clearElementsSet();
         }
     }
 
-    private static JobGraph createJobGraphWithKeyedState(
+    private JobGraph createJobGraphWithKeyedState(
             int parallelism,
             int maxParallelism,
             int numberKeys,
             int numberElements,
-            boolean terminateAfterEmission,
-            int checkpointingInterval) {
+            int numberElementsExpect,
+            boolean failAfterEmission,
+            int checkpointingInterval,
+            MiniCluster miniCluster)
+            throws IOException {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(parallelism);
         if (0 < maxParallelism) {
             env.getConfig().setMaxParallelism(maxParallelism);
         }
         env.enableCheckpointing(checkpointingInterval);
+        
env.getCheckpointConfig().setCheckpointStorage(temporaryFolder.newFolder().toURI());
         env.getCheckpointConfig()
                 .setExternalizedCheckpointCleanup(
                         
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
         env.setRestartStrategy(RestartStrategies.noRestart());
         env.getConfig().setUseSnapshotCompression(true);
 
+        SharedReference<JobID> jobID = sharedObjects.add(new JobID());
+        SharedReference<MiniCluster> miniClusterRef = 
sharedObjects.add(miniCluster);
         DataStream<Integer> input =
                 env.addSource(
                                 new NotifyingDefiniteKeySource(
-                                        numberKeys, numberElements, 
terminateAfterEmission))
+                                        numberKeys, numberElements, 
failAfterEmission) {
+                                    @Override
+                                    public void waitCheckpointCompleted() 
throws Exception {
+                                        Optional<String> 
mostRecentCompletedCheckpointPath =
+                                                
getLatestCompletedCheckpointPath(
+                                                        jobID.get(), 
miniClusterRef.get());
+                                        while 
(!mostRecentCompletedCheckpointPath.isPresent()) {
+                                            Thread.sleep(50);
+                                            mostRecentCompletedCheckpointPath =
+                                                    
getLatestCompletedCheckpointPath(
+                                                            jobID.get(), 
miniClusterRef.get());
+                                        }
+                                    }
+                                })
                         .keyBy(
                                 new KeySelector<Integer, Integer>() {
                                     private static final long serialVersionUID 
= 1L;
 
                                     @Override
-                                    public Integer getKey(Integer value) 
throws Exception {
+                                    public Integer getKey(Integer value) {
                                         return value;
                                     }
                                 });
-        SubtaskIndexFlatMapper.workCompletedLatch = new 
CountDownLatch(numberKeys);
-
         DataStream<Tuple2<Integer, Integer>> result =
-                input.flatMap(new SubtaskIndexFlatMapper(numberElements));
+                input.flatMap(new 
SubtaskIndexFlatMapper(numberElementsExpect));
 
         result.addSink(new CollectionSink<>());
 
-        return env.getStreamGraph().getJobGraph();
+        return env.getStreamGraph().getJobGraph(jobID.get());
     }
 
     private static class NotifyingDefiniteKeySource extends 
RichParallelSourceFunction<Integer> {
@@ -293,25 +286,25 @@ public class RescaleCheckpointManuallyITCase extends 
TestLogger {
         private static final long serialVersionUID = 1L;
 
         private final int numberKeys;
-        private final int numberElements;
-        private final boolean terminateAfterEmission;
-
+        protected final int numberElements;
+        private final boolean failAfterEmission;
         protected int counter = 0;
-
         private boolean running = true;
 
         public NotifyingDefiniteKeySource(
-                int numberKeys, int numberElements, boolean 
terminateAfterEmission) {
+                int numberKeys, int numberElements, boolean failAfterEmission) 
{
+            Preconditions.checkState(numberElements > 0);
             this.numberKeys = numberKeys;
             this.numberElements = numberElements;
-            this.terminateAfterEmission = terminateAfterEmission;
+            this.failAfterEmission = failAfterEmission;
         }
 
+        public void waitCheckpointCompleted() throws Exception {}
+
         @Override
         public void run(SourceContext<Integer> ctx) throws Exception {
             final int subtaskIndex = 
getRuntimeContext().getIndexOfThisSubtask();
             while (running) {
-
                 if (counter < numberElements) {
                     synchronized (ctx.getCheckpointLock()) {
                         for (int value = subtaskIndex;
@@ -322,10 +315,12 @@ public class RescaleCheckpointManuallyITCase extends 
TestLogger {
                         counter++;
                     }
                 } else {
-                    if (terminateAfterEmission) {
-                        running = false;
+                    waitCheckpointCompleted();
+                    if (failAfterEmission) {
+                        throw new FlinkRuntimeException(
+                                "Make job fail artificially, to retain 
completed checkpoint.");
                     } else {
-                        Thread.sleep(100);
+                        running = false;
                     }
                 }
             }
@@ -343,8 +338,6 @@ public class RescaleCheckpointManuallyITCase extends 
TestLogger {
 
         private static final long serialVersionUID = 1L;
 
-        public static CountDownLatch workCompletedLatch = new 
CountDownLatch(1);
-
         private transient ValueState<Integer> counter;
         private transient ValueState<Integer> sum;
 
@@ -365,9 +358,8 @@ public class RescaleCheckpointManuallyITCase extends 
TestLogger {
             int s = sumValue == null ? value : sumValue + value;
             sum.update(s);
 
-            if (count % numberElements == 0) {
+            if (count == numberElements) {
                 
out.collect(Tuple2.of(getRuntimeContext().getIndexOfThisSubtask(), s));
-                workCompletedLatch.countDown();
             }
         }
 

Reply via email to