Repository: flink
Updated Branches:
  refs/heads/release-1.5 b4136d248 -> b4f9e61b5


[FLINK-8487] Verify ZooKeeper checkpoint store behaviour with ITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b4f9e61b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b4f9e61b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b4f9e61b

Branch: refs/heads/release-1.5
Commit: b4f9e61b5d061f2fa9166bc5ea83ef6cd80eb0e6
Parents: b4136d2
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Sat Mar 3 09:34:56 2018 +0100
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Sun Mar 11 08:43:27 2018 -0700

----------------------------------------------------------------------
 .../ZooKeeperCompletedCheckpointStore.java      |   2 +-
 .../ZooKeeperHighAvailabilityITCase.java        | 326 +++++++++++++++++++
 2 files changed, 327 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b4f9e61b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 0cbd4fb..f221270 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -209,7 +209,7 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
 
                if (completedCheckpoints.isEmpty() && 
numberOfInitialCheckpoints > 0) {
                        throw new FlinkException(
-                               "Could not read any of the " + 
numberOfInitialCheckpoints + " from storage.");
+                               "Could not read any of the " + 
numberOfInitialCheckpoints + " checkpoints from storage.");
                } else if (completedCheckpoints.size() != 
numberOfInitialCheckpoints) {
                        LOG.warn(
                                "Could only fetch {} of {} checkpoints from 
storage.",

http://git-wip-us.apache.org/repos/asf/flink/blob/b4f9e61b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
new file mode 100644
index 0000000..156d448
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestLogger {
+
+       private static final Duration TEST_TIMEOUT = Duration.ofSeconds(10000L);
+
+       private static final int NUM_JMS = 1;
+       private static final int NUM_TMS = 1;
+       private static final int NUM_SLOTS_PER_TM = 1;
+
+       @ClassRule
+       public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+       private static File haStorageDir;
+
+       private static TestingServer zkServer;
+
+       private static MiniClusterResource miniClusterResource;
+
+       private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
+       private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
+
+       @BeforeClass
+       public static void setup() throws Exception {
+               zkServer = new TestingServer();
+
+               Configuration config = new Configuration();
+               config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 
NUM_JMS);
+               config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
NUM_TMS);
+               config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
NUM_SLOTS_PER_TM);
+
+               haStorageDir = TEMPORARY_FOLDER.newFolder();
+
+               config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
haStorageDir.toString());
+               config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, 
UUID.randomUUID().toString());
+               config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zkServer.getConnectString());
+               config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+
+               // we have to manage this manually because we have to create 
the ZooKeeper server
+               // ahead of this
+               miniClusterResource = new MiniClusterResource(
+                       new 
MiniClusterResource.MiniClusterResourceConfiguration(
+                               config,
+                               NUM_TMS,
+                               NUM_SLOTS_PER_TM),
+                       true);
+
+               miniClusterResource.before();
+       }
+
+       @AfterClass
+       public static void tearDown() throws Exception {
+               miniClusterResource.after();
+
+               zkServer.stop();
+               zkServer.close();
+       }
+
+       /**
+        * Verify that we don't start a job from scratch if we cannot restore 
any of the
+        * CompletedCheckpoints.
+        *
+        * <p>Synchronization for the different steps and things we want to 
observe happens via
+        * latches in the test method and the methods of {@link 
CheckpointBlockingFunction}.
+        *
+        * <p>The test follows these steps:
+        * <ol>
+        *     <li>Start job and block on a latch until we have done some 
checkpoints
+        *     <li>Block in the special function
+        *     <li>Move away the contents of the ZooKeeper HA directory to make 
restoring from
+        *       checkpoints impossible
+        *     <li>Unblock the special function, which now induces a failure
+        *     <li>Make sure that the job does not recover successfully
+        *     <li>Move back the HA directory
+        *     <li>Make sure that the job recovers, we use a latch to ensure 
that the operator
+        *       restored successfully
+        * </ol>
+        */
+       @Test(timeout = 120_000L)
+       public void testRestoreBehaviourWithFaultyStateHandles() throws 
Exception {
+               
CheckpointBlockingFunction.allowedInitializeCallsWithoutRestore.set(1);
+               CheckpointBlockingFunction.successfulRestores.set(0);
+               CheckpointBlockingFunction.illegalRestores.set(0);
+               CheckpointBlockingFunction.afterMessWithZooKeeper.set(false);
+               CheckpointBlockingFunction.failedAlready.set(false);
+
+               waitForCheckpointLatch = new OneShotLatch();
+               failInCheckpointLatch = new OneShotLatch();
+
+               ClusterClient<?> clusterClient = 
miniClusterResource.getClusterClient();
+               final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(1);
+               
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
0));
+               env.enableCheckpointing(10); // Flink doesn't allow lower than 
10 ms
+
+               File checkpointLocation = TEMPORARY_FOLDER.newFolder();
+               env.setStateBackend((StateBackend) new 
FsStateBackend(checkpointLocation.toURI()));
+
+               DataStreamSource<String> source = env.addSource(new 
UnboundedSource());
+
+               source
+                       .keyBy((str) -> str)
+                       .map(new CheckpointBlockingFunction());
+
+               JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+               JobID jobID = Preconditions.checkNotNull(jobGraph.getJobID());
+
+               clusterClient.setDetached(true);
+               clusterClient.submitJob(jobGraph, 
ZooKeeperHighAvailabilityITCase.class.getClassLoader());
+
+               // wait until we did some checkpoints
+               waitForCheckpointLatch.await();
+
+               // mess with the HA directory so that the job cannot restore
+               File movedCheckpointLocation = TEMPORARY_FOLDER.newFolder();
+               int numCheckpoints = 0;
+               File[] files = haStorageDir.listFiles();
+               assertNotNull(files);
+               for (File file : files) {
+                       if (file.getName().startsWith("completedCheckpoint")) {
+                               assertTrue(file.renameTo(new 
File(movedCheckpointLocation, file.getName())));
+                               numCheckpoints++;
+                       }
+               }
+               // Note to future developers: This will break when we change 
Flink to not put the
+               // checkpoint metadata into the HA directory but instead rely 
on the fact that the
+               // actual checkpoint directory on DFS contains the checkpoint 
metadata. In this case,
+               // ZooKeeper will only contain a "handle" (read: String) that 
points to the metadata
+               // in DFS. The likely solution will be that we have to go 
directly to ZooKeeper, find
+               // out where the checkpoint is stored and mess with that.
+               assertTrue(numCheckpoints > 0);
+
+               failInCheckpointLatch.trigger();
+
+               // Ensure that we see at least one cycle where the job tries to 
restart and fails.
+               CompletableFuture<JobStatus> jobStatusFuture = 
FutureUtils.retrySuccesfulWithDelay(
+                       () -> clusterClient.getJobStatus(jobID),
+                       Time.milliseconds(1),
+                       deadline,
+                       (jobStatus) -> jobStatus == JobStatus.RESTARTING,
+                       TestingUtils.defaultScheduledExecutor());
+               assertEquals(JobStatus.RESTARTING, jobStatusFuture.get());
+
+               jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
+                       () -> clusterClient.getJobStatus(jobID),
+                       Time.milliseconds(1),
+                       deadline,
+                       (jobStatus) -> jobStatus == JobStatus.FAILING,
+                       TestingUtils.defaultScheduledExecutor());
+               assertEquals(JobStatus.FAILING, jobStatusFuture.get());
+
+               // move back the HA directory so that the job can restore
+               CheckpointBlockingFunction.afterMessWithZooKeeper.set(true);
+
+               files = movedCheckpointLocation.listFiles();
+               assertNotNull(files);
+               for (File file : files) {
+                       if (file.getName().startsWith("completedCheckpoint")) {
+                               assertTrue(file.renameTo(new File(haStorageDir, 
file.getName())));
+                       }
+               }
+
+               // now the job should be able to go to RUNNING again and then 
eventually to FINISHED,
+               // which it only does if it could successfully restore
+               jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
+                       () -> clusterClient.getJobStatus(jobID),
+                       Time.milliseconds(50),
+                       deadline,
+                       (jobStatus) -> jobStatus == JobStatus.FINISHED,
+                       TestingUtils.defaultScheduledExecutor());
+               assertEquals(JobStatus.FINISHED, jobStatusFuture.get());
+
+               assertThat("We saw illegal restores.", 
CheckpointBlockingFunction.illegalRestores.get(), is(0));
+       }
+
+       private static class UnboundedSource implements SourceFunction<String> {
+               private volatile boolean running = true;
+
+               @Override
+               public void run(SourceContext<String> ctx) throws Exception {
+                       while (running && 
!CheckpointBlockingFunction.afterMessWithZooKeeper.get()) {
+                               ctx.collect("hello");
+                               // don't overdo it ... ;-)
+                               Thread.sleep(50);
+                       }
+               }
+
+               @Override
+               public void cancel() {
+                       running = false;
+               }
+       }
+
+       private static class CheckpointBlockingFunction
+                       extends RichMapFunction<String, String>
+                       implements CheckpointedFunction {
+
+               // verify that we only call initializeState()
+               // once with isRestored() == false. All other invocations must 
have isRestored() == true. This
+               // verifies that we don't restart a job from scratch in case 
the CompletedCheckpoints can't
+               // be read.
+               static AtomicInteger allowedInitializeCallsWithoutRestore = new 
AtomicInteger(1);
+
+               // we count when we see restores that are not allowed. We only
+               // allow restores once we messed with the HA directory and 
moved it back again
+               static AtomicInteger illegalRestores = new AtomicInteger(0);
+               static AtomicInteger successfulRestores = new AtomicInteger(0);
+
+               // whether we are after the phase where we messed with the 
ZooKeeper HA directory, i.e.
+               // whether it's now ok for a restore to happen
+               static AtomicBoolean afterMessWithZooKeeper = new 
AtomicBoolean(false);
+
+               static AtomicBoolean failedAlready = new AtomicBoolean(false);
+
+               // also have some state to write to the checkpoint
+               private final ValueStateDescriptor<String> stateDescriptor =
+                       new ValueStateDescriptor<>("state", 
StringSerializer.INSTANCE);
+
+               @Override
+               public String map(String value) throws Exception {
+                       
getRuntimeContext().getState(stateDescriptor).update("42");
+                       return value;
+               }
+
+               @Override
+               public void snapshotState(FunctionSnapshotContext context) 
throws Exception {
+                       if (context.getCheckpointId() > 5) {
+                               waitForCheckpointLatch.trigger();
+                               failInCheckpointLatch.await();
+                               if (!failedAlready.getAndSet(true)) {
+                                       throw new RuntimeException("Failing on 
purpose.");
+                               }
+                       }
+               }
+
+               @Override
+               public void initializeState(FunctionInitializationContext 
context) {
+                       if (!context.isRestored()) {
+                               int updatedValue = 
allowedInitializeCallsWithoutRestore.decrementAndGet();
+                               if (updatedValue < 0) {
+                                       illegalRestores.getAndIncrement();
+                                       throw new RuntimeException("We are not 
allowed any more restores.");
+                               }
+                       } else {
+                               if (!afterMessWithZooKeeper.get()) {
+                                       illegalRestores.getAndIncrement();
+                               } else if (successfulRestores.getAndIncrement() 
> 0) {
+                                       // already saw the one allowed 
successful restore
+                                       illegalRestores.getAndIncrement();
+                               }
+                       }
+               }
+       }
+}

Reply via email to