[FLINK-8487] Verify ZooKeeper checkpoint store behaviour with ITCase

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b2437f87
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b2437f87
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b2437f87

Branch: refs/heads/release-1.3.3-rc1
Commit: b2437f87e361a822adbad6f1c3e6eb14eeeb09fa
Parents: 1432092
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Sat Mar 3 09:34:56 2018 +0100
Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Committed: Mon Mar 12 18:40:45 2018 +0800

----------------------------------------------------------------------
 .../ZooKeeperCompletedCheckpointStore.java      |   2 +-
 .../flink/runtime/concurrent/FutureUtils.java   |  77 +++-
 .../ZooKeeperHighAvailabilityITCase.java        | 379 +++++++++++++++++++
 3 files changed, 453 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b2437f87/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 0cbd4fb..f221270 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -209,7 +209,7 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
 
                if (completedCheckpoints.isEmpty() && 
numberOfInitialCheckpoints > 0) {
                        throw new FlinkException(
-                               "Could not read any of the " + 
numberOfInitialCheckpoints + " from storage.");
+                               "Could not read any of the " + 
numberOfInitialCheckpoints + " checkpoints from storage.");
                } else if (completedCheckpoints.size() != 
numberOfInitialCheckpoints) {
                        LOG.warn(
                                "Could only fetch {} of {} checkpoints from 
storage.",

http://git-wip-us.apache.org/repos/asf/flink/blob/b2437f87/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index a27af56..a56ed92 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.concurrent;
 
+import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.util.Preconditions;
 
@@ -28,6 +29,8 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import scala.concurrent.duration.Deadline;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -86,6 +89,72 @@ public class FutureUtils {
                });
        }
 
+       /**
+        * Retry the given operation the given number of times in case of a 
failure.
+        *
+        * @param operation to executed
+        * @param successPredicate if the result is acceptable
+        * @param deadline how much time we have left
+        * @param executor to use to run the futures
+        * @param <T> type of the result
+        * @return Future containing either the result of the operation or a 
{@link RetryException}
+        */
+       public static <T> Future<T> retrySuccessful(
+               final Callable<Future<T>> operation,
+               final FilterFunction<T> successPredicate,
+               final Deadline deadline,
+               final Executor executor) {
+
+               Future<T> operationResultFuture;
+
+               try {
+                       operationResultFuture = operation.call();
+               } catch (Exception e) {
+                       return FlinkCompletableFuture.completedExceptionally(
+                               new RetryException("Could not execute the 
provided operation.", e));
+               }
+
+               return operationResultFuture.handleAsync(new BiFunction<T, 
Throwable, Future<T>>() {
+                       @Override
+                       public Future<T> apply(T t, Throwable throwable) {
+                               if (throwable != null) {
+                                       if (deadline.hasTimeLeft()) {
+                                               return 
retrySuccessful(operation, successPredicate, deadline, executor);
+                                       } else {
+                                               return 
FlinkCompletableFuture.completedExceptionally(
+                                                       new 
RetryException("Could not complete the operation. Number of retries " +
+                                                               "has been 
exhausted.", throwable));
+                                       }
+                               } else {
+                                       Boolean predicateResult;
+                                       try {
+                                               predicateResult = 
successPredicate.filter(t);
+                                       } catch (Exception e) {
+                                               return 
FlinkCompletableFuture.completedExceptionally(
+                                                       new 
RetryException("Predicate threw an exception.", e));
+
+                                       }
+                                       if (predicateResult) {
+                                               return 
FlinkCompletableFuture.completed(t);
+                                       } if (deadline.hasTimeLeft()) {
+                                               return 
retrySuccessful(operation, successPredicate, deadline, executor);
+                                       } else {
+                                               return 
FlinkCompletableFuture.completedExceptionally(
+                                                       new RetryException("No 
time left and predicate returned false for " + t));
+
+                                       }
+                               }
+                       }
+               }, executor)
+                       .thenCompose(new ApplyFunction<Future<T>, Future<T>>() {
+                               @Override
+                               public Future<T> apply(Future<T> value) {
+                                       return value;
+                               }
+                       });
+       }
+
+
        public static class RetryException extends Exception {
 
                private static final long serialVersionUID = 
3613470781274141862L;
@@ -108,14 +177,14 @@ public class FutureUtils {
        // 
------------------------------------------------------------------------
 
        /**
-        * Creates a future that is complete once multiple other futures 
completed. 
+        * Creates a future that is complete once multiple other futures 
completed.
         * The future fails (completes exceptionally) once one of the futures 
in the
         * conjunction fails. Upon successful completion, the future returns the
         * collection of the futures' results.
         *
         * <p>The ConjunctFuture gives access to how many Futures in the 
conjunction have already
-        * completed successfully, via {@link 
ConjunctFuture#getNumFuturesCompleted()}. 
-        * 
+        * completed successfully, via {@link 
ConjunctFuture#getNumFuturesCompleted()}.
+        *
         * @param futures The futures that make up the conjunction. No null 
entries are allowed.
         * @return The ConjunctFuture that completes once all given futures are 
complete (or one fails).
         */
@@ -157,7 +226,7 @@ public class FutureUtils {
         * A future that is complete once multiple other futures completed. The 
futures are not
         * necessarily of the same type. The ConjunctFuture fails (completes 
exceptionally) once
         * one of the Futures in the conjunction fails.
-        * 
+        *
         * <p>The advantage of using the ConjunctFuture over chaining all the 
futures (such as via
         * {@link Future#thenCombine(Future, BiFunction)}) is that 
ConjunctFuture also tracks how
         * many of the Futures are already complete.

http://git-wip-us.apache.org/repos/asf/flink/blob/b2437f87/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
new file mode 100644
index 0000000..3905c09
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.concurrent.Await;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
+
+       private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(5, TimeUnit.MINUTES);
+
+       private static final int NUM_JMS = 1;
+       private static final int NUM_TMS = 1;
+       private static final int NUM_SLOTS_PER_TM = 1;
+
+       @ClassRule
+       public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+       private static File haStorageDir;
+
+       private static TestingServer zkServer;
+
+       private static LocalFlinkMiniCluster cluster = null;
+
+       private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
+       private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
+
+       @BeforeClass
+       public static void setup() throws Exception {
+               zkServer = new TestingServer();
+
+               Configuration config = new Configuration();
+               config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 
NUM_JMS);
+               config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
NUM_TMS);
+               config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
NUM_SLOTS_PER_TM);
+
+               haStorageDir = temporaryFolder.newFolder();
+
+               config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
haStorageDir.toString());
+               config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, 
UUID.randomUUID().toString());
+               config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zkServer.getConnectString());
+               config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+
+               cluster = TestBaseUtils.startCluster(config, false);
+       }
+
+       @AfterClass
+       public static void tearDown() throws Exception {
+               stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
+
+               zkServer.stop();
+               zkServer.close();
+       }
+
+       /**
+        * Verify that we don't start a job from scratch if we cannot restore 
any of the
+        * CompletedCheckpoints.
+        *
+        * <p>Synchronization for the different steps and things we want to 
observe happens via
+        * latches in the test method and the methods of {@link 
CheckpointBlockingFunction}.
+        *
+        * <p>The test follows these steps:
+        * <ol>
+        *     <li>Start job and block on a latch until we have done some 
checkpoints
+        *     <li>Block in the special function
+        *     <li>Move away the contents of the ZooKeeper HA directory to make 
restoring from
+        *       checkpoints impossible
+        *     <li>Unblock the special function, which now induces a failure
+        *     <li>Make sure that the job does not recover successfully
+        *     <li>Move back the HA directory
+        *     <li>Make sure that the job recovers, we use a latch to ensure 
that the operator
+        *       restored successfully
+        * </ol>
+        */
+       @Test(timeout = 120_000L)
+       public void testRestoreBehaviourWithFaultyStateHandles() throws 
Exception {
+               
CheckpointBlockingFunction.allowedInitializeCallsWithoutRestore.set(1);
+               CheckpointBlockingFunction.successfulRestores.set(0);
+               CheckpointBlockingFunction.illegalRestores.set(0);
+               CheckpointBlockingFunction.afterMessWithZooKeeper.set(false);
+               CheckpointBlockingFunction.failedAlready.set(false);
+
+               waitForCheckpointLatch = new OneShotLatch();
+               failInCheckpointLatch = new OneShotLatch();
+
+               final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(1);
+               
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
0));
+               env.enableCheckpointing(10); // Flink doesn't allow lower than 
10 ms
+
+               File checkpointLocation = temporaryFolder.newFolder();
+               env.setStateBackend(new 
FsStateBackend(checkpointLocation.toURI()));
+
+               DataStreamSource<String> source = env.addSource(new 
UnboundedSource());
+
+               source
+                       .keyBy(new KeySelector<String, String>() {
+                               @Override
+                               public String getKey(String value) {
+                                       return value;
+                               }
+                       })
+                       .map(new CheckpointBlockingFunction());
+
+               JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+               final JobID jobID = 
Preconditions.checkNotNull(jobGraph.getJobID());
+
+               // Retrieve the job manager
+               final ActorGateway jobManager = 
Await.result(cluster.leaderGateway().future(), deadline.timeLeft());
+
+               cluster.submitJobDetached(jobGraph);
+
+               // wait until we did some checkpoints
+               waitForCheckpointLatch.await();
+
+               // mess with the HA directory so that the job cannot restore
+               File movedCheckpointLocation = temporaryFolder.newFolder();
+               int numCheckpoints = 0;
+               File[] files = haStorageDir.listFiles();
+               assertNotNull(files);
+               for (File file : files) {
+                       if (file.getName().startsWith("completedCheckpoint")) {
+                               assertTrue(file.renameTo(new 
File(movedCheckpointLocation, file.getName())));
+                               numCheckpoints++;
+                       }
+               }
+               assertTrue(numCheckpoints > 0);
+
+               failInCheckpointLatch.trigger();
+
+               // Ensure that we see at least one cycle where the job tries to 
restart and fails.
+               Future<JobStatus> jobStatusFuture = FutureUtils.retrySuccessful(
+                       new Callable<Future<JobStatus>>() {
+                               @Override
+                               public Future<JobStatus> call(){
+                                       return getJobStatus(jobManager, jobID, 
TEST_TIMEOUT);
+                               }
+                       },
+                       new FilterFunction<JobStatus>() {
+                               @Override
+                               public boolean filter(JobStatus jobStatus){
+                                       return jobStatus == 
JobStatus.RESTARTING;
+                               }
+                       },
+                       deadline,
+                       TestingUtils.defaultExecutor());
+               assertEquals(JobStatus.RESTARTING, jobStatusFuture.get());
+
+               jobStatusFuture = FutureUtils.retrySuccessful(
+                       new Callable<Future<JobStatus>>() {
+                               @Override
+                               public Future<JobStatus> call() {
+                                       return getJobStatus(jobManager, jobID, 
TEST_TIMEOUT);
+                               }
+                       },
+                       new FilterFunction<JobStatus>() {
+                               @Override
+                               public boolean filter(JobStatus jobStatus) {
+                                       return jobStatus == JobStatus.FAILING;
+                               }
+                       },
+                       deadline,
+                       TestingUtils.defaultExecutor());
+               assertEquals(JobStatus.FAILING, jobStatusFuture.get());
+
+               // move back the HA directory so that the job can restore
+               CheckpointBlockingFunction.afterMessWithZooKeeper.set(true);
+
+               files = movedCheckpointLocation.listFiles();
+               assertNotNull(files);
+               for (File file : files) {
+                       if (file.getName().startsWith("completedCheckpoint")) {
+                               assertTrue(file.renameTo(new File(haStorageDir, 
file.getName())));
+                       }
+               }
+
+               // now the job should be able to go to RUNNING again and then 
eventually to FINISHED,
+               // which it only does if it could successfully restore
+               jobStatusFuture = FutureUtils.retrySuccessful(
+                       new Callable<Future<JobStatus>>() {
+                               @Override
+                               public Future<JobStatus> call() {
+                                       return getJobStatus(jobManager, jobID, 
TEST_TIMEOUT);
+                               }
+                       },
+                       new FilterFunction<JobStatus>() {
+                               @Override
+                               public boolean filter(JobStatus jobStatus) {
+                                       return jobStatus == JobStatus.FINISHED;
+                               }
+                       },
+                       deadline,
+                       TestingUtils.defaultExecutor());
+               assertEquals(JobStatus.FINISHED, jobStatusFuture.get());
+
+               assertThat("We saw illegal restores.", 
CheckpointBlockingFunction.illegalRestores.get(), is(0));
+       }
+
+       /**
+        * Requests the {@link JobStatus} of the job with the given {@link 
JobID}.
+        */
+       private Future<JobStatus> getJobStatus(
+               final ActorGateway jobManager,
+               final JobID jobId,
+               final FiniteDuration timeout) {
+
+               scala.concurrent.Future<Object> response =
+                       
jobManager.ask(JobManagerMessages.getRequestJobStatus(jobId), timeout);
+
+               FlinkFuture<Object> flinkFuture = new FlinkFuture<>(response);
+
+               return flinkFuture.thenApply(new ApplyFunction<Object, 
JobStatus>() {
+                       @Override
+                       public JobStatus apply(Object value) {
+                               if (value instanceof 
JobManagerMessages.CurrentJobStatus) {
+                                       return 
((JobManagerMessages.CurrentJobStatus) value).status();
+                               } else if (value instanceof 
JobManagerMessages.JobNotFound) {
+                                       throw new RuntimeException(
+                                               new 
IllegalStateException("Could not find job with JobId " + jobId));
+                               } else {
+                                       throw new RuntimeException(
+                                               new 
IllegalStateException("Unknown JobManager response of type " + 
value.getClass()));
+                               }
+                       }
+               });
+       }
+
+       private static class UnboundedSource implements SourceFunction<String> {
+               private volatile boolean running = true;
+
+               @Override
+               public void run(SourceContext<String> ctx) throws Exception {
+                       while (running && 
!CheckpointBlockingFunction.afterMessWithZooKeeper.get()) {
+                               ctx.collect("hello");
+                               // don't overdo it ... ;-)
+                               Thread.sleep(50);
+                       }
+               }
+
+               @Override
+               public void cancel() {
+                       running = false;
+               }
+       }
+
+       private static class CheckpointBlockingFunction
+                       extends RichMapFunction<String, String>
+                       implements CheckpointedFunction {
+
+               // verify that we only call initializeState()
+               // once with isRestored() == false. All other invocations must 
have isRestored() == true. This
+               // verifies that we don't restart a job from scratch in case 
the CompletedCheckpoints can't
+               // be read.
+               static AtomicInteger allowedInitializeCallsWithoutRestore = new 
AtomicInteger(1);
+
+               // we count when we see restores that are not allowed. We only
+               // allow restores once we messed with the HA directory and 
moved it back again
+               static AtomicInteger illegalRestores = new AtomicInteger(0);
+               static AtomicInteger successfulRestores = new AtomicInteger(0);
+
+               // whether we are after the phase where we messed with the 
ZooKeeper HA directory, i.e.
+               // whether it's now ok for a restore to happen
+               static AtomicBoolean afterMessWithZooKeeper = new 
AtomicBoolean(false);
+
+               static AtomicBoolean failedAlready = new AtomicBoolean(false);
+
+               // also have some state to write to the checkpoint
+               private final ValueStateDescriptor<String> stateDescriptor =
+                       new ValueStateDescriptor<>("state", 
StringSerializer.INSTANCE);
+
+               @Override
+               public String map(String value) throws Exception {
+                       
getRuntimeContext().getState(stateDescriptor).update("42");
+                       return value;
+               }
+
+               @Override
+               public void snapshotState(FunctionSnapshotContext context) 
throws Exception {
+                       if (context.getCheckpointId() > 5) {
+                               waitForCheckpointLatch.trigger();
+                               failInCheckpointLatch.await();
+                               if (!failedAlready.getAndSet(true)) {
+                                       throw new RuntimeException("Failing on 
purpose.");
+                               }
+                       }
+               }
+
+               @Override
+               public void initializeState(FunctionInitializationContext 
context) {
+                       if (!context.isRestored()) {
+                               int updatedValue = 
allowedInitializeCallsWithoutRestore.decrementAndGet();
+                               if (updatedValue < 0) {
+                                       illegalRestores.getAndIncrement();
+                                       throw new RuntimeException("We are not 
allowed any more restores.");
+                               }
+                       } else {
+                               if (!afterMessWithZooKeeper.get()) {
+                                       illegalRestores.getAndIncrement();
+                               } else if (successfulRestores.getAndIncrement() 
> 0) {
+                                       // already saw the one allowed 
successful restore
+                                       illegalRestores.getAndIncrement();
+                               }
+                       }
+               }
+       }
+}

Reply via email to