curcur commented on a change in pull request #16836:
URL: https://github.com/apache/flink/pull/16836#discussion_r694473966



##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointRestoreWithUidHashITCase.java
##########
@@ -18,56 +18,185 @@
 
 package org.apache.flink.test.checkpointing;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.OperatorIDPair;
+import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.testutils.junit.SharedReference;
 
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.stream.IntStream;
 
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 
 /**
- * This case verifies that if {@code uidHash} is set, the job would still be 
able to restored from
- * the checkpoints correctly without losing states due to mismatched operator 
id.
+ * Verifies that with {@code uidHash} a job could restore state from an 
existing savepoint, and the
+ * job would still be able to restored from the checkpoints taken after 
restarted correctly without
+ * losing states due to mismatched operator id.
  */
 public class CheckpointRestoreWithUidHashITCase {
 
+    @ClassRule public static final TemporaryFolder TMP_FOLDER = new 
TemporaryFolder();
+
+    @Rule public final SharedObjects sharedObjects = SharedObjects.create();
+
+    private SharedReference<List<Integer>> result;
+
+    @Before
+    public void setup() {
+        result = sharedObjects.add(new ArrayList<>());
+    }
+
     @Test
-    public void testCheckpointRestoreWithUidHash() throws Exception {
-        final int maxNumber = 1000;
+    public void testRestoreFromSavepointBySetUidHash() throws Exception {
+        final int maxNumber = 100;
+
+        try (MiniCluster miniCluster = new 
MiniCluster(createMiniClusterConfig())) {
+            miniCluster.start();
+
+            StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+            JobGraph firstJob =
+                    createJobGraph(
+                            env,
+                            
StatefulSourceBehavior.HOLD_AFTER_CHECKPOINT_ON_FIRST_RUN,
+                            maxNumber,
+                            "random",

Review comment:
       "random" -> "test-uid"

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointRestoreWithUidHashITCase.java
##########
@@ -18,56 +18,185 @@
 
 package org.apache.flink.test.checkpointing;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.OperatorIDPair;
+import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.testutils.junit.SharedReference;
 
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.stream.IntStream;
 
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 
 /**
- * This case verifies that if {@code uidHash} is set, the job would still be 
able to restored from
- * the checkpoints correctly without losing states due to mismatched operator 
id.
+ * Verifies that with {@code uidHash} a job could restore state from an 
existing savepoint, and the
+ * job would still be able to restored from the checkpoints taken after 
restarted correctly without
+ * losing states due to mismatched operator id.
  */
 public class CheckpointRestoreWithUidHashITCase {
 
+    @ClassRule public static final TemporaryFolder TMP_FOLDER = new 
TemporaryFolder();
+
+    @Rule public final SharedObjects sharedObjects = SharedObjects.create();
+
+    private SharedReference<List<Integer>> result;
+
+    @Before
+    public void setup() {
+        result = sharedObjects.add(new ArrayList<>());
+    }
+
     @Test
-    public void testCheckpointRestoreWithUidHash() throws Exception {
-        final int maxNumber = 1000;
+    public void testRestoreFromSavepointBySetUidHash() throws Exception {
+        final int maxNumber = 100;
+
+        try (MiniCluster miniCluster = new 
MiniCluster(createMiniClusterConfig())) {
+            miniCluster.start();
+
+            StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+            JobGraph firstJob =
+                    createJobGraph(
+                            env,
+                            
StatefulSourceBehavior.HOLD_AFTER_CHECKPOINT_ON_FIRST_RUN,
+                            maxNumber,
+                            "random",
+                            null,
+                            null);
+            JobID jobId = miniCluster.submitJob(firstJob).get().getJobID();
+            waitForAllTaskRunning(miniCluster, jobId);
+            String savepointPath =
+                    miniCluster
+                            .triggerSavepoint(jobId, 
TMP_FOLDER.newFolder().getAbsolutePath(), true)
+                            .get();
+
+            // Get the operator id
+            List<OperatorIDPair> operatorIds =
+                    
firstJob.getVerticesSortedTopologicallyFromSources().get(0).getOperatorIDs();
+            OperatorIDPair sourceOperatorIds = 
operatorIds.get(operatorIds.size() - 1);
+
+            JobGraph secondJob =
+                    createJobGraph(
+                            env,
+                            StatefulSourceBehavior.PROCESS_ONLY,
+                            maxNumber,
+                            null,
+                            
sourceOperatorIds.getGeneratedOperatorID().toHexString(),
+                            savepointPath);
+            miniCluster.executeJobBlocking(secondJob);
+        }
+        assertThat(result.get(), contains(IntStream.range(0, 
maxNumber).boxed().toArray()));
+    }
+
+    @Test
+    public void testRestoreCheckpointAfterFailoverWithUidHashSet() throws 
Exception {
+        final int maxNumber = 100;
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 500));
         env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
 
-        String uidHash = new OperatorID().toHexString();
-        Iterator<Integer> sinkIterator =
-                env.addSource(new StatefulSource(maxNumber))
-                        .setUidHash(uidHash)
-                        .executeAndCollect();
-        List<Integer> result = new ArrayList<>();
-        sinkIterator.forEachRemaining(result::add);
-        assertThat(result, contains(IntStream.range(0, 
maxNumber).boxed().toArray()));
+        JobGraph jobGraph =
+                createJobGraph(
+                        env,
+                        
StatefulSourceBehavior.FAIL_AFTER_CHECKPOINT_ON_FIRST_RUN,
+                        maxNumber,
+                        null,
+                        new OperatorID().toHexString(),
+                        null);
+
+        try (MiniCluster miniCluster = new 
MiniCluster(createMiniClusterConfig())) {
+            miniCluster.start();
+            miniCluster.executeJobBlocking(jobGraph);
+        }
+        assertThat(result.get(), contains(IntStream.range(0, 
maxNumber).boxed().toArray()));

Review comment:
       same as above.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointRestoreWithUidHashITCase.java
##########
@@ -18,56 +18,185 @@
 
 package org.apache.flink.test.checkpointing;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.OperatorIDPair;
+import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.testutils.junit.SharedReference;
 
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.stream.IntStream;
 
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 
 /**
- * This case verifies that if {@code uidHash} is set, the job would still be 
able to restored from
- * the checkpoints correctly without losing states due to mismatched operator 
id.
+ * Verifies that with {@code uidHash} a job could restore state from an 
existing savepoint, and the
+ * job would still be able to restored from the checkpoints taken after 
restarted correctly without
+ * losing states due to mismatched operator id.
  */
 public class CheckpointRestoreWithUidHashITCase {
 
+    @ClassRule public static final TemporaryFolder TMP_FOLDER = new 
TemporaryFolder();
+
+    @Rule public final SharedObjects sharedObjects = SharedObjects.create();
+
+    private SharedReference<List<Integer>> result;
+
+    @Before
+    public void setup() {
+        result = sharedObjects.add(new ArrayList<>());
+    }
+
     @Test
-    public void testCheckpointRestoreWithUidHash() throws Exception {
-        final int maxNumber = 1000;
+    public void testRestoreFromSavepointBySetUidHash() throws Exception {
+        final int maxNumber = 100;
+
+        try (MiniCluster miniCluster = new 
MiniCluster(createMiniClusterConfig())) {
+            miniCluster.start();
+
+            StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+            JobGraph firstJob =
+                    createJobGraph(
+                            env,
+                            
StatefulSourceBehavior.HOLD_AFTER_CHECKPOINT_ON_FIRST_RUN,
+                            maxNumber,
+                            "random",
+                            null,
+                            null);
+            JobID jobId = miniCluster.submitJob(firstJob).get().getJobID();
+            waitForAllTaskRunning(miniCluster, jobId);
+            String savepointPath =
+                    miniCluster
+                            .triggerSavepoint(jobId, 
TMP_FOLDER.newFolder().getAbsolutePath(), true)
+                            .get();
+
+            // Get the operator id
+            List<OperatorIDPair> operatorIds =
+                    
firstJob.getVerticesSortedTopologicallyFromSources().get(0).getOperatorIDs();
+            OperatorIDPair sourceOperatorIds = 
operatorIds.get(operatorIds.size() - 1);
+
+            JobGraph secondJob =
+                    createJobGraph(
+                            env,
+                            StatefulSourceBehavior.PROCESS_ONLY,
+                            maxNumber,
+                            null,
+                            
sourceOperatorIds.getGeneratedOperatorID().toHexString(),
+                            savepointPath);
+            miniCluster.executeJobBlocking(secondJob);
+        }
+        assertThat(result.get(), contains(IntStream.range(0, 
maxNumber).boxed().toArray()));

Review comment:
       Do you need to assert the equality of length to guarantee exactly-once?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to