http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java deleted file mode 100644 index f517f83..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java +++ /dev/null @@ -1,391 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.checkpointing; - -import org.apache.flink.api.common.functions.RichFilterFunction; -import org.apache.flink.api.common.functions.RichFlatMapFunction; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.common.state.OperatorState; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; -import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.apache.flink.util.Collector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.Random; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -/** - * A simple test that runs a streaming topology with checkpointing enabled. - * - * The test triggers a failure after a while and verifies that, after completion, the - * state defined with either the {@link OperatorState} or the {@link Checkpointed} - * interface reflects the "exactly once" semantics. - * - * The test throttles the input until at least two checkpoints are completed, to make sure that - * the recovery does not fall back to "square one" (which would naturally lead to correct - * results without testing the checkpointing). - */ -@SuppressWarnings("serial") -public class StateCheckpoinedITCase extends StreamFaultToleranceTestBase { - - private static final Logger LOG = LoggerFactory.getLogger(StateCheckpoinedITCase.class); - - final long NUM_STRINGS = 10_000_000L; - - /** - * Runs the following program: - * - * <pre> - * [ (source)->(filter)] -> [ (map) -> (map) ] -> [ (groupBy/reduce)->(sink) ] - * </pre> - */ - @Override - public void testProgram(StreamExecutionEnvironment env) { - assertTrue("Broken test setup", NUM_STRINGS % 40 == 0); - - final long failurePosMin = (long) (0.4 * NUM_STRINGS / PARALLELISM); - final long failurePosMax = (long) (0.7 * NUM_STRINGS / PARALLELISM); - - final long failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin; - - env.enableCheckpointing(200); - - DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS)); - - stream - // first vertex, chained to the source - // this filter throttles the flow until at least one checkpoint - // is complete, to make sure this program does not run without - .filter(new StringRichFilterFunction()) - - // -------------- seconds vertex - one-to-one connected ---------------- - .map(new StringPrefixCountRichMapFunction()) - .startNewChain() - .map(new StatefulCounterFunction()) - - // -------------- third vertex - reducer and the sink ---------------- - .partitionByHash("prefix") - .flatMap(new OnceFailingAggregator(failurePos)) - .addSink(new ValidatingSink()); - } - - @Override - public void postSubmit() { - - //assertTrue("Test inconclusive: failure occurred before first checkpoint", - // OnceFailingAggregator.wasCheckpointedBeforeFailure); - if(!OnceFailingAggregator.wasCheckpointedBeforeFailure) { - LOG.warn("Test inconclusive: failure occurred before first checkpoint"); - } - - long filterSum = 0; - for (long l : StringRichFilterFunction.counts) { - filterSum += l; - } - - long mapSum = 0; - for (long l : StringPrefixCountRichMapFunction.counts) { - mapSum += l; - } - - long countSum = 0; - for (long l : StatefulCounterFunction.counts) { - countSum += l; - } - - // verify that we counted exactly right - assertEquals(NUM_STRINGS, filterSum); - assertEquals(NUM_STRINGS, mapSum); - assertEquals(NUM_STRINGS, countSum); - - for (Map<Character, Long> map : ValidatingSink.maps) { - for (Long count : map.values()) { - assertEquals(NUM_STRINGS / 40, count.longValue()); - } - } - } - - // -------------------------------------------------------------------------------------------- - // Custom Functions - // -------------------------------------------------------------------------------------------- - - private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String> - implements CheckpointedAsynchronously<Integer> - { - private final long numElements; - - private int index; - - private volatile boolean isRunning = true; - - - StringGeneratingSourceFunction(long numElements) { - this.numElements = numElements; - } - - @Override - public void run(SourceContext<String> ctx) throws Exception { - final Object lockingObject = ctx.getCheckpointLock(); - - final Random rnd = new Random(); - final StringBuilder stringBuilder = new StringBuilder(); - - final int step = getRuntimeContext().getNumberOfParallelSubtasks(); - - if (index == 0) { - index = getRuntimeContext().getIndexOfThisSubtask(); - } - - while (isRunning && index < numElements) { - char first = (char) ((index % 40) + 40); - - stringBuilder.setLength(0); - stringBuilder.append(first); - - String result = randomString(stringBuilder, rnd); - - synchronized (lockingObject) { - index += step; - ctx.collect(result); - } - } - } - - @Override - public void cancel() { - isRunning = false; - } - - private static String randomString(StringBuilder bld, Random rnd) { - final int len = rnd.nextInt(10) + 5; - - for (int i = 0; i < len; i++) { - char next = (char) (rnd.nextInt(20000) + 33); - bld.append(next); - } - - return bld.toString(); - } - - @Override - public Integer snapshotState(long checkpointId, long checkpointTimestamp) { - return index; - } - - @Override - public void restoreState(Integer state) { - index = state; - } - } - - private static class StringRichFilterFunction extends RichFilterFunction<String> - implements Checkpointed<Long> { - - static final long[] counts = new long[PARALLELISM]; - - private long count; - - @Override - public boolean filter(String value) throws Exception { - count++; - return value.length() < 100; // should be always true - } - - @Override - public void close() { - counts[getRuntimeContext().getIndexOfThisSubtask()] = count; - } - - @Override - public Long snapshotState(long checkpointId, long checkpointTimestamp) { - return count; - } - - @Override - public void restoreState(Long state) { - count = state; - } - } - - private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount> - implements CheckpointedAsynchronously<Long> { - - static final long[] counts = new long[PARALLELISM]; - - private long count; - - @Override - public PrefixCount map(String value) { - count++; - return new PrefixCount(value.substring(0, 1), value, 1L); - } - - @Override - public void close() { - counts[getRuntimeContext().getIndexOfThisSubtask()] = count; - } - - @Override - public Long snapshotState(long checkpointId, long checkpointTimestamp) { - return count; - } - - @Override - public void restoreState(Long state) { - count = state; - } - } - - private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount> - implements Checkpointed<Long> { - - static final long[] counts = new long[PARALLELISM]; - - private long count; - - @Override - public PrefixCount map(PrefixCount value) throws Exception { - count++; - return value; - } - - @Override - public void close() throws IOException { - counts[getRuntimeContext().getIndexOfThisSubtask()] = count; - } - - @Override - public Long snapshotState(long checkpointId, long checkpointTimestamp) { - return count; - } - - @Override - public void restoreState(Long state) { - count = state; - } - } - - private static class OnceFailingAggregator extends RichFlatMapFunction<PrefixCount, PrefixCount> - implements Checkpointed<HashMap<String, PrefixCount>>, CheckpointNotifier { - - static boolean wasCheckpointedBeforeFailure = false; - - private static volatile boolean hasFailed = false; - - private final HashMap<String, PrefixCount> aggregationMap = new HashMap<String, PrefixCount>(); - - private long failurePos; - private long count; - - private boolean wasCheckpointed; - - - OnceFailingAggregator(long failurePos) { - this.failurePos = failurePos; - } - - @Override - public void open(Configuration parameters) { - count = 0; - } - - @Override - public void flatMap(PrefixCount value, Collector<PrefixCount> out) throws Exception { - count++; - if (!hasFailed && count >= failurePos && getRuntimeContext().getIndexOfThisSubtask() == 1) { - wasCheckpointedBeforeFailure = wasCheckpointed; - hasFailed = true; - throw new Exception("Test Failure"); - } - - PrefixCount curr = aggregationMap.get(value.prefix); - if (curr == null) { - aggregationMap.put(value.prefix, value); - out.collect(value); - } - else { - curr.count += value.count; - out.collect(curr); - } - } - - @Override - public HashMap<String, PrefixCount> snapshotState(long checkpointId, long checkpointTimestamp) { - return aggregationMap; - } - - @Override - public void restoreState(HashMap<String, PrefixCount> state) { - aggregationMap.putAll(state); - } - - @Override - public void notifyCheckpointComplete(long checkpointId) { - this.wasCheckpointed = true; - } - } - - private static class ValidatingSink extends RichSinkFunction<PrefixCount> - implements Checkpointed<HashMap<Character, Long>> { - - @SuppressWarnings("unchecked") - private static Map<Character, Long>[] maps = (Map<Character, Long>[]) new Map<?, ?>[PARALLELISM]; - - private HashMap<Character, Long> counts = new HashMap<Character, Long>(); - - @Override - public void invoke(PrefixCount value) { - Character first = value.prefix.charAt(0); - Long previous = counts.get(first); - if (previous == null) { - counts.put(first, value.count); - } else { - counts.put(first, Math.max(previous, value.count)); - } - } - - @Override - public void close() throws Exception { - maps[getRuntimeContext().getIndexOfThisSubtask()] = counts; - } - - @Override - public HashMap<Character, Long> snapshotState(long checkpointId, long checkpointTimestamp) { - return counts; - } - - @Override - public void restoreState(HashMap<Character, Long> state) { - counts.putAll(state); - } - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java new file mode 100644 index 0000000..d7c06f6 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java @@ -0,0 +1,391 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.functions.RichFilterFunction; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.state.OperatorState; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.util.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * A simple test that runs a streaming topology with checkpointing enabled. + * + * The test triggers a failure after a while and verifies that, after completion, the + * state defined with either the {@link OperatorState} or the {@link Checkpointed} + * interface reflects the "exactly once" semantics. + * + * The test throttles the input until at least two checkpoints are completed, to make sure that + * the recovery does not fall back to "square one" (which would naturally lead to correct + * results without testing the checkpointing). + */ +@SuppressWarnings("serial") +public class StateCheckpointedITCase extends StreamFaultToleranceTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(StateCheckpointedITCase.class); + + final long NUM_STRINGS = 10_000_000L; + + /** + * Runs the following program: + * + * <pre> + * [ (source)->(filter)] -> [ (map) -> (map) ] -> [ (groupBy/reduce)->(sink) ] + * </pre> + */ + @Override + public void testProgram(StreamExecutionEnvironment env) { + assertTrue("Broken test setup", NUM_STRINGS % 40 == 0); + + final long failurePosMin = (long) (0.4 * NUM_STRINGS / PARALLELISM); + final long failurePosMax = (long) (0.7 * NUM_STRINGS / PARALLELISM); + + final long failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin; + + env.enableCheckpointing(200); + + DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS)); + + stream + // first vertex, chained to the source + // this filter throttles the flow until at least one checkpoint + // is complete, to make sure this program does not run without + .filter(new StringRichFilterFunction()) + + // -------------- seconds vertex - one-to-one connected ---------------- + .map(new StringPrefixCountRichMapFunction()) + .startNewChain() + .map(new StatefulCounterFunction()) + + // -------------- third vertex - reducer and the sink ---------------- + .partitionByHash("prefix") + .flatMap(new OnceFailingAggregator(failurePos)) + .addSink(new ValidatingSink()); + } + + @Override + public void postSubmit() { + + //assertTrue("Test inconclusive: failure occurred before first checkpoint", + // OnceFailingAggregator.wasCheckpointedBeforeFailure); + if(!OnceFailingAggregator.wasCheckpointedBeforeFailure) { + LOG.warn("Test inconclusive: failure occurred before first checkpoint"); + } + + long filterSum = 0; + for (long l : StringRichFilterFunction.counts) { + filterSum += l; + } + + long mapSum = 0; + for (long l : StringPrefixCountRichMapFunction.counts) { + mapSum += l; + } + + long countSum = 0; + for (long l : StatefulCounterFunction.counts) { + countSum += l; + } + + // verify that we counted exactly right + assertEquals(NUM_STRINGS, filterSum); + assertEquals(NUM_STRINGS, mapSum); + assertEquals(NUM_STRINGS, countSum); + + for (Map<Character, Long> map : ValidatingSink.maps) { + for (Long count : map.values()) { + assertEquals(NUM_STRINGS / 40, count.longValue()); + } + } + } + + // -------------------------------------------------------------------------------------------- + // Custom Functions + // -------------------------------------------------------------------------------------------- + + private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String> + implements CheckpointedAsynchronously<Integer> + { + private final long numElements; + + private int index; + + private volatile boolean isRunning = true; + + + StringGeneratingSourceFunction(long numElements) { + this.numElements = numElements; + } + + @Override + public void run(SourceContext<String> ctx) throws Exception { + final Object lockingObject = ctx.getCheckpointLock(); + + final Random rnd = new Random(); + final StringBuilder stringBuilder = new StringBuilder(); + + final int step = getRuntimeContext().getNumberOfParallelSubtasks(); + + if (index == 0) { + index = getRuntimeContext().getIndexOfThisSubtask(); + } + + while (isRunning && index < numElements) { + char first = (char) ((index % 40) + 40); + + stringBuilder.setLength(0); + stringBuilder.append(first); + + String result = randomString(stringBuilder, rnd); + + synchronized (lockingObject) { + index += step; + ctx.collect(result); + } + } + } + + @Override + public void cancel() { + isRunning = false; + } + + private static String randomString(StringBuilder bld, Random rnd) { + final int len = rnd.nextInt(10) + 5; + + for (int i = 0; i < len; i++) { + char next = (char) (rnd.nextInt(20000) + 33); + bld.append(next); + } + + return bld.toString(); + } + + @Override + public Integer snapshotState(long checkpointId, long checkpointTimestamp) { + return index; + } + + @Override + public void restoreState(Integer state) { + index = state; + } + } + + private static class StringRichFilterFunction extends RichFilterFunction<String> + implements Checkpointed<Long> { + + static final long[] counts = new long[PARALLELISM]; + + private long count; + + @Override + public boolean filter(String value) throws Exception { + count++; + return value.length() < 100; // should be always true + } + + @Override + public void close() { + counts[getRuntimeContext().getIndexOfThisSubtask()] = count; + } + + @Override + public Long snapshotState(long checkpointId, long checkpointTimestamp) { + return count; + } + + @Override + public void restoreState(Long state) { + count = state; + } + } + + private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount> + implements CheckpointedAsynchronously<Long> { + + static final long[] counts = new long[PARALLELISM]; + + private long count; + + @Override + public PrefixCount map(String value) { + count++; + return new PrefixCount(value.substring(0, 1), value, 1L); + } + + @Override + public void close() { + counts[getRuntimeContext().getIndexOfThisSubtask()] = count; + } + + @Override + public Long snapshotState(long checkpointId, long checkpointTimestamp) { + return count; + } + + @Override + public void restoreState(Long state) { + count = state; + } + } + + private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount> + implements Checkpointed<Long> { + + static final long[] counts = new long[PARALLELISM]; + + private long count; + + @Override + public PrefixCount map(PrefixCount value) throws Exception { + count++; + return value; + } + + @Override + public void close() throws IOException { + counts[getRuntimeContext().getIndexOfThisSubtask()] = count; + } + + @Override + public Long snapshotState(long checkpointId, long checkpointTimestamp) { + return count; + } + + @Override + public void restoreState(Long state) { + count = state; + } + } + + private static class OnceFailingAggregator extends RichFlatMapFunction<PrefixCount, PrefixCount> + implements Checkpointed<HashMap<String, PrefixCount>>, CheckpointNotifier { + + static boolean wasCheckpointedBeforeFailure = false; + + private static volatile boolean hasFailed = false; + + private final HashMap<String, PrefixCount> aggregationMap = new HashMap<String, PrefixCount>(); + + private long failurePos; + private long count; + + private boolean wasCheckpointed; + + + OnceFailingAggregator(long failurePos) { + this.failurePos = failurePos; + } + + @Override + public void open(Configuration parameters) { + count = 0; + } + + @Override + public void flatMap(PrefixCount value, Collector<PrefixCount> out) throws Exception { + count++; + if (!hasFailed && count >= failurePos && getRuntimeContext().getIndexOfThisSubtask() == 1) { + wasCheckpointedBeforeFailure = wasCheckpointed; + hasFailed = true; + throw new Exception("Test Failure"); + } + + PrefixCount curr = aggregationMap.get(value.prefix); + if (curr == null) { + aggregationMap.put(value.prefix, value); + out.collect(value); + } + else { + curr.count += value.count; + out.collect(curr); + } + } + + @Override + public HashMap<String, PrefixCount> snapshotState(long checkpointId, long checkpointTimestamp) { + return aggregationMap; + } + + @Override + public void restoreState(HashMap<String, PrefixCount> state) { + aggregationMap.putAll(state); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + this.wasCheckpointed = true; + } + } + + private static class ValidatingSink extends RichSinkFunction<PrefixCount> + implements Checkpointed<HashMap<Character, Long>> { + + @SuppressWarnings("unchecked") + private static Map<Character, Long>[] maps = (Map<Character, Long>[]) new Map<?, ?>[PARALLELISM]; + + private HashMap<Character, Long> counts = new HashMap<Character, Long>(); + + @Override + public void invoke(PrefixCount value) { + Character first = value.prefix.charAt(0); + Long previous = counts.get(first); + if (previous == null) { + counts.put(first, value.count); + } else { + counts.put(first, Math.max(previous, value.count)); + } + } + + @Override + public void close() throws Exception { + maps[getRuntimeContext().getIndexOfThisSubtask()] = counts; + } + + @Override + public HashMap<Character, Long> snapshotState(long checkpointId, long checkpointTimestamp) { + return counts; + } + + @Override + public void restoreState(HashMap<Character, Long> state) { + counts.putAll(state); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractJobManagerProcessFailureRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractJobManagerProcessFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractJobManagerProcessFailureRecoveryITCase.java new file mode 100644 index 0000000..ba5ff1c --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractJobManagerProcessFailureRecoveryITCase.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.recovery; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import org.apache.commons.io.FileUtils; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.StreamingMode; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.leaderelection.TestingListener; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.runtime.testutils.JobManagerActorTestUtils; +import org.apache.flink.runtime.testutils.JobManagerProcess; +import org.apache.flink.runtime.testutils.ZooKeeperTestUtils; +import org.apache.flink.runtime.util.ZooKeeperUtils; +import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment; +import org.apache.flink.util.TestLogger; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Test; +import scala.Option; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.io.IOException; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.runtime.testutils.CommonTestUtils.createTempDirectory; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; + +/** + * Verify behaviour in case of JobManager process failure during job execution. + * + * <p>The test works with multiple job managers processes by spawning JVMs. + * + * <p>Initially, it starts two TaskManager (2 slots each) and two JobManager JVMs. + * + * <p>It submits a program with parallelism 4 and waits until all tasks are brought up. + * Coordination between the test and the tasks happens via checking for the existence of + * temporary files. It then kills the leading JobManager process. The recovery should restart the + * tasks on the new JobManager. + * + * <p>This follows the same structure as {@link AbstractTaskManagerProcessFailureRecoveryTest}. + */ +public abstract class AbstractJobManagerProcessFailureRecoveryITCase extends TestLogger { + + private final static ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1); + + private final static FiniteDuration TestTimeOut = new FiniteDuration(5, TimeUnit.MINUTES); + + private static final File FileStateBackendBasePath; + + static { + try { + FileStateBackendBasePath = CommonTestUtils.createTempDirectory(); + } + catch (IOException e) { + throw new RuntimeException("Error in test setup. Could not create directory.", e); + } + } + + @AfterClass + public static void tearDown() throws Exception { + if (ZooKeeper != null) { + ZooKeeper.shutdown(); + } + + if (FileStateBackendBasePath != null) { + FileUtils.deleteDirectory(FileStateBackendBasePath); + } + } + + @Before + public void cleanUp() throws Exception { + ZooKeeper.deleteAll(); + + FileUtils.cleanDirectory(FileStateBackendBasePath); + } + + protected static final String READY_MARKER_FILE_PREFIX = "ready_"; + protected static final String FINISH_MARKER_FILE_PREFIX = "finish_"; + protected static final String PROCEED_MARKER_FILE = "proceed"; + + protected static final int PARALLELISM = 4; + + /** + * Test program with JobManager failure. + * + * @param zkQuorum ZooKeeper quorum to connect to + * @param coordinateDir Coordination directory + * @throws Exception + */ + public abstract void testJobManagerFailure(String zkQuorum, File coordinateDir) throws Exception; + + @Test + public void testJobManagerProcessFailure() throws Exception { + // Config + final int numberOfJobManagers = 2; + final int numberOfTaskManagers = 2; + final int numberOfSlotsPerTaskManager = 2; + + assertEquals(PARALLELISM, numberOfTaskManagers * numberOfSlotsPerTaskManager); + + // Setup + // Test actor system + ActorSystem testActorSystem; + + // Job managers + final JobManagerProcess[] jmProcess = new JobManagerProcess[numberOfJobManagers]; + + // Task managers + final ActorSystem[] tmActorSystem = new ActorSystem[numberOfTaskManagers]; + + // Leader election service + LeaderRetrievalService leaderRetrievalService = null; + + // Coordination between the processes goes through a directory + File coordinateTempDir = null; + + try { + final Deadline deadline = TestTimeOut.fromNow(); + + // Coordination directory + coordinateTempDir = createTempDirectory(); + + // Job Managers + Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig( + ZooKeeper.getConnectString(), FileStateBackendBasePath.getPath()); + + // Start first process + jmProcess[0] = new JobManagerProcess(0, config); + jmProcess[0].createAndStart(); + + // Task manager configuration + config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4); + config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2); + + // Start the task manager process + for (int i = 0; i < numberOfTaskManagers; i++) { + tmActorSystem[i] = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig()); + TaskManager.startTaskManagerComponentsAndActor( + config, tmActorSystem[i], "localhost", + Option.<String>empty(), Option.<LeaderRetrievalService>empty(), + false, StreamingMode.STREAMING, TaskManager.class); + } + + // Test actor system + testActorSystem = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig()); + + jmProcess[0].getActorRef(testActorSystem, deadline.timeLeft()); + + // Leader listener + TestingListener leaderListener = new TestingListener(); + leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(config); + leaderRetrievalService.start(leaderListener); + + // Initial submission + leaderListener.waitForNewLeader(deadline.timeLeft().toMillis()); + + String leaderAddress = leaderListener.getAddress(); + UUID leaderId = leaderListener.getLeaderSessionID(); + + // Get the leader ref + ActorRef leaderRef = AkkaUtils.getActorRef(leaderAddress, testActorSystem, deadline.timeLeft()); + ActorGateway leaderGateway = new AkkaActorGateway(leaderRef, leaderId); + + // Wait for all task managers to connect to the leading job manager + JobManagerActorTestUtils.waitForTaskManagers(numberOfTaskManagers, leaderGateway, + deadline.timeLeft()); + + final File coordinateDirClosure = coordinateTempDir; + final Throwable[] errorRef = new Throwable[1]; + + // we trigger program execution in a separate thread + Thread programTrigger = new Thread("Program Trigger") { + @Override + public void run() { + try { + testJobManagerFailure(ZooKeeper.getConnectString(), coordinateDirClosure); + } + catch (Throwable t) { + t.printStackTrace(); + errorRef[0] = t; + } + } + }; + + //start the test program + programTrigger.start(); + + // wait until all marker files are in place, indicating that all tasks have started + AbstractTaskManagerProcessFailureRecoveryTest.waitForMarkerFiles(coordinateTempDir, + READY_MARKER_FILE_PREFIX, PARALLELISM, deadline.timeLeft().toMillis()); + + // Kill one of the job managers and trigger recovery + jmProcess[0].destroy(); + + jmProcess[1] = new JobManagerProcess(1, config); + jmProcess[1].createAndStart(); + + jmProcess[1].getActorRef(testActorSystem, deadline.timeLeft()); + + // we create the marker file which signals the program functions tasks that they can complete + AbstractTaskManagerProcessFailureRecoveryTest.touchFile(new File(coordinateTempDir, PROCEED_MARKER_FILE)); + + programTrigger.join(deadline.timeLeft().toMillis()); + + // We wait for the finish marker file. We don't wait for the program trigger, because + // we submit in detached mode. + AbstractTaskManagerProcessFailureRecoveryTest.waitForMarkerFiles(coordinateTempDir, + FINISH_MARKER_FILE_PREFIX, 1, deadline.timeLeft().toMillis()); + + // check that the program really finished + assertFalse("The program did not finish in time", programTrigger.isAlive()); + + // check whether the program encountered an error + if (errorRef[0] != null) { + Throwable error = errorRef[0]; + error.printStackTrace(); + fail("The program encountered a " + error.getClass().getSimpleName() + " : " + error.getMessage()); + } + } + catch (Exception e) { + e.printStackTrace(); + + for (JobManagerProcess p : jmProcess) { + if (p != null) { + p.printProcessLog(); + } + } + + fail(e.getMessage()); + } + finally { + for (int i = 0; i < numberOfTaskManagers; i++) { + if (tmActorSystem[i] != null) { + tmActorSystem[i].shutdown(); + } + } + + if (leaderRetrievalService != null) { + leaderRetrievalService.stop(); + } + + for (JobManagerProcess jmProces : jmProcess) { + if (jmProces != null) { + jmProces.destroy(); + } + } + + // Delete coordination directory + if (coordinateTempDir != null) { + try { + FileUtils.deleteDirectory(coordinateTempDir); + } + catch (Throwable ignored) { + } + } + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java deleted file mode 100644 index 7e16baf..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java +++ /dev/null @@ -1,444 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.recovery; - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.pattern.Patterns; -import akka.util.Timeout; - -import org.apache.commons.io.FileUtils; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.StreamingMode; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.jobmanager.JobManager; -import org.apache.flink.runtime.jobmanager.MemoryArchivist; -import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.taskmanager.TaskManager; -import org.apache.flink.runtime.testutils.CommonTestUtils; -import org.apache.flink.util.NetUtils; -import org.apache.flink.util.TestLogger; - -import org.junit.Test; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import scala.Some; -import scala.Tuple2; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.StringWriter; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; - -import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath; -import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.fail; - -/** - * Abstract base for tests verifying the behavior of the recovery in the - * case when a TaskManager fails (process is killed) in the middle of a job execution. - * - * The test works with multiple task managers processes by spawning JVMs. - * Initially, it starts a JobManager in process and two TaskManagers JVMs with - * 2 task slots each. - * It submits a program with parallelism 4 and waits until all tasks are brought up. - * Coordination between the test and the tasks happens via checking for the - * existence of temporary files. It then starts another TaskManager, which is - * guaranteed to remain empty (all tasks are already deployed) and kills one of - * the original task managers. The recovery should restart the tasks on the new TaskManager. - */ -public abstract class AbstractProcessFailureRecoveryTest extends TestLogger { - - protected static final String READY_MARKER_FILE_PREFIX = "ready_"; - protected static final String PROCEED_MARKER_FILE = "proceed"; - - protected static final int PARALLELISM = 4; - - @Test - public void testTaskManagerProcessFailure() { - - final StringWriter processOutput1 = new StringWriter(); - final StringWriter processOutput2 = new StringWriter(); - final StringWriter processOutput3 = new StringWriter(); - - ActorSystem jmActorSystem = null; - Process taskManagerProcess1 = null; - Process taskManagerProcess2 = null; - Process taskManagerProcess3 = null; - - File coordinateTempDir = null; - - try { - // check that we run this test only if the java command - // is available on this machine - String javaCommand = getJavaCommandPath(); - if (javaCommand == null) { - System.out.println("---- Skipping Process Failure test : Could not find java executable ----"); - return; - } - - // create a logging file for the process - File tempLogFile = File.createTempFile(getClass().getSimpleName() + "-", "-log4j.properties"); - tempLogFile.deleteOnExit(); - CommonTestUtils.printLog4jDebugConfig(tempLogFile); - - // coordination between the processes goes through a directory - coordinateTempDir = createTempDirectory(); - - // find a free port to start the JobManager - final int jobManagerPort = NetUtils.getAvailablePort(); - - // start a JobManager - Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", jobManagerPort); - - Configuration jmConfig = new Configuration(); - jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms"); - jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 s"); - jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9); - jmConfig.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "10 s"); - - jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<Tuple2<String, Object>>(localAddress)); - ActorRef jmActor = JobManager.startJobManagerActors( - jmConfig, - jmActorSystem, - StreamingMode.STREAMING, - JobManager.class, - MemoryArchivist.class)._1(); - - // the TaskManager java command - String[] command = new String[] { - javaCommand, - "-Dlog.level=DEBUG", - "-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(), - "-Xms80m", "-Xmx80m", - "-classpath", getCurrentClasspath(), - TaskManagerProcessEntryPoint.class.getName(), - String.valueOf(jobManagerPort) - }; - - // start the first two TaskManager processes - taskManagerProcess1 = new ProcessBuilder(command).start(); - new PipeForwarder(taskManagerProcess1.getErrorStream(), processOutput1); - taskManagerProcess2 = new ProcessBuilder(command).start(); - new PipeForwarder(taskManagerProcess2.getErrorStream(), processOutput2); - - // we wait for the JobManager to have the two TaskManagers available - // since some of the CI environments are very hostile, we need to give this a lot of time (2 minutes) - waitUntilNumTaskManagersAreRegistered(jmActor, 2, 120000); - - // the program will set a marker file in each of its parallel tasks once they are ready, so that - // this coordinating code is aware of this. - // the program will very slowly consume elements until the marker file (later created by the - // test driver code) is present - final File coordinateDirClosure = coordinateTempDir; - final AtomicReference<Throwable> errorRef = new AtomicReference<>(); - - // we trigger program execution in a separate thread - Thread programTrigger = new Thread("Program Trigger") { - @Override - public void run() { - try { - testProgram(jobManagerPort, coordinateDirClosure); - } - catch (Throwable t) { - t.printStackTrace(); - errorRef.set(t); - } - } - }; - - //start the test program - programTrigger.start(); - - // wait until all marker files are in place, indicating that all tasks have started - // max 20 seconds - if (!waitForMarkerFiles(coordinateTempDir, PARALLELISM, 120000)) { - // check if the program failed for some reason - if (errorRef.get() != null) { - Throwable error = errorRef.get(); - error.printStackTrace(); - fail("The program encountered a " + error.getClass().getSimpleName() + " : " + error.getMessage()); - } - else { - // no error occurred, simply a timeout - fail("The tasks were not started within time (" + 120000 + "msecs)"); - } - } - - // start the third TaskManager - taskManagerProcess3 = new ProcessBuilder(command).start(); - new PipeForwarder(taskManagerProcess3.getErrorStream(), processOutput3); - - // we wait for the third TaskManager to register - // since some of the CI environments are very hostile, we need to give this a lot of time (2 minutes) - waitUntilNumTaskManagersAreRegistered(jmActor, 3, 120000); - - // kill one of the previous TaskManagers, triggering a failure and recovery - taskManagerProcess1.destroy(); - taskManagerProcess1 = null; - - // we create the marker file which signals the program functions tasks that they can complete - touchFile(new File(coordinateTempDir, PROCEED_MARKER_FILE)); - - // wait for at most 5 minutes for the program to complete - programTrigger.join(300000); - - // check that the program really finished - assertFalse("The program did not finish in time", programTrigger.isAlive()); - - // check whether the program encountered an error - if (errorRef.get() != null) { - Throwable error = errorRef.get(); - error.printStackTrace(); - fail("The program encountered a " + error.getClass().getSimpleName() + " : " + error.getMessage()); - } - - // all seems well :-) - } - catch (Exception e) { - e.printStackTrace(); - printProcessLog("TaskManager 1", processOutput1.toString()); - printProcessLog("TaskManager 2", processOutput2.toString()); - printProcessLog("TaskManager 3", processOutput3.toString()); - fail(e.getMessage()); - } - catch (Error e) { - e.printStackTrace(); - printProcessLog("TaskManager 1", processOutput1.toString()); - printProcessLog("TaskManager 2", processOutput2.toString()); - printProcessLog("TaskManager 3", processOutput3.toString()); - throw e; - } - finally { - if (taskManagerProcess1 != null) { - taskManagerProcess1.destroy(); - } - if (taskManagerProcess2 != null) { - taskManagerProcess2.destroy(); - } - if (taskManagerProcess3 != null) { - taskManagerProcess3.destroy(); - } - if (jmActorSystem != null) { - jmActorSystem.shutdown(); - } - if (coordinateTempDir != null) { - try { - FileUtils.deleteDirectory(coordinateTempDir); - } - catch (Throwable t) { - // we can ignore this - } - } - } - } - - /** - * The test program should be implemented here in a form of a separate thread. - * This provides a solution for checking that it has been terminated. - * - * @param jobManagerPort The port for submitting the topology to the local cluster - * @param coordinateDir TaskManager failure will be triggered only after processes - * have successfully created file under this directory - */ - public abstract void testProgram(int jobManagerPort, File coordinateDir) throws Exception; - - - protected void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, int numExpected, long maxDelay) - throws Exception - { - final long deadline = System.currentTimeMillis() + maxDelay; - while (true) { - long remaining = deadline - System.currentTimeMillis(); - if (remaining <= 0) { - fail("The TaskManagers did not register within the expected time (" + maxDelay + "msecs)"); - } - - FiniteDuration timeout = new FiniteDuration(remaining, TimeUnit.MILLISECONDS); - - try { - Future<?> result = Patterns.ask(jobManager, - JobManagerMessages.getRequestNumberRegisteredTaskManager(), - new Timeout(timeout)); - Integer numTMs = (Integer) Await.result(result, timeout); - if (numTMs == numExpected) { - break; - } - } - catch (TimeoutException e) { - // ignore and retry - } - catch (ClassCastException e) { - fail("Wrong response: " + e.getMessage()); - } - } - } - - protected static void printProcessLog(String processName, String log) { - if (log == null || log.length() == 0) { - return; - } - - System.out.println("-----------------------------------------"); - System.out.println(" BEGIN SPAWNED PROCESS LOG FOR " + processName); - System.out.println("-----------------------------------------"); - System.out.println(log); - System.out.println("-----------------------------------------"); - System.out.println(" END SPAWNED PROCESS LOG"); - System.out.println("-----------------------------------------"); - } - - protected static File createTempDirectory() throws IOException { - File tempDir = new File(System.getProperty("java.io.tmpdir")); - - for (int i = 0; i < 10; i++) { - File dir = new File(tempDir, UUID.randomUUID().toString()); - if (!dir.exists() && dir.mkdirs()) { - return dir; - } - System.err.println("Could not use temporary directory " + dir.getAbsolutePath()); - } - - throw new IOException("Could not create temporary file directory"); - } - - protected static void touchFile(File file) throws IOException { - if (!file.exists()) { - new FileOutputStream(file).close(); - } - if (!file.setLastModified(System.currentTimeMillis())) { - throw new IOException("Could not touch the file."); - } - } - - protected static boolean waitForMarkerFiles(File basedir, int num, long timeout) { - long now = System.currentTimeMillis(); - final long deadline = now + timeout; - - - while (now < deadline) { - boolean allFound = true; - - for (int i = 0; i < num; i++) { - File nextToCheck = new File(basedir, READY_MARKER_FILE_PREFIX + i); - if (!nextToCheck.exists()) { - allFound = false; - break; - } - } - - if (allFound) { - return true; - } - else { - // not all found, wait for a bit - try { - Thread.sleep(10); - } - catch (InterruptedException e) { - throw new RuntimeException(e); - } - - now = System.currentTimeMillis(); - } - } - - return false; - } - - // -------------------------------------------------------------------------------------------- - - /** - * The entry point for the TaskManager JVM. Simply configures and runs a TaskManager. - */ - public static class TaskManagerProcessEntryPoint { - - private static final Logger LOG = LoggerFactory.getLogger(TaskManagerProcessEntryPoint.class); - - public static void main(String[] args) { - try { - int jobManagerPort = Integer.parseInt(args[0]); - - Configuration cfg = new Configuration(); - cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"); - cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort); - cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4); - cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100); - cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2); - - TaskManager.selectNetworkInterfaceAndRunTaskManager(cfg, StreamingMode.STREAMING, TaskManager.class); - - // wait forever - Object lock = new Object(); - synchronized (lock) { - lock.wait(); - } - } - catch (Throwable t) { - LOG.error("Failed to start TaskManager process", t); - System.exit(1); - } - } - } - - /** - * Utility class to read the output of a process stream and forward it into a StringWriter. - */ - protected static class PipeForwarder extends Thread { - - private final StringWriter target; - private final InputStream source; - - public PipeForwarder(InputStream source, StringWriter target) { - super("Pipe Forwarder"); - setDaemon(true); - - this.source = source; - this.target = target; - - start(); - } - - @Override - public void run() { - try { - int next; - while ((next = source.read()) != -1) { - target.write(next); - } - } - catch (IOException e) { - // terminate - } - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java new file mode 100644 index 0000000..c02fa6c --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.recovery; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.pattern.Patterns; +import akka.util.Timeout; + +import org.apache.commons.io.FileUtils; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.StreamingMode; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.jobmanager.MemoryArchivist; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.util.NetUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.Some; +import scala.Tuple2; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.StringWriter; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath; +import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; + +/** + * Abstract base for tests verifying the behavior of the recovery in the + * case when a TaskManager fails (process is killed) in the middle of a job execution. + * + * The test works with multiple task managers processes by spawning JVMs. + * Initially, it starts a JobManager in process and two TaskManagers JVMs with + * 2 task slots each. + * It submits a program with parallelism 4 and waits until all tasks are brought up. + * Coordination between the test and the tasks happens via checking for the + * existence of temporary files. It then starts another TaskManager, which is + * guaranteed to remain empty (all tasks are already deployed) and kills one of + * the original task managers. The recovery should restart the tasks on the new TaskManager. + */ +public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends TestLogger { + + protected static final String READY_MARKER_FILE_PREFIX = "ready_"; + protected static final String PROCEED_MARKER_FILE = "proceed"; + protected static final String FINISH_MARKER_FILE_PREFIX = "finish_"; + + protected static final int PARALLELISM = 4; + + @Test + public void testTaskManagerProcessFailure() { + + final StringWriter processOutput1 = new StringWriter(); + final StringWriter processOutput2 = new StringWriter(); + final StringWriter processOutput3 = new StringWriter(); + + ActorSystem jmActorSystem = null; + Process taskManagerProcess1 = null; + Process taskManagerProcess2 = null; + Process taskManagerProcess3 = null; + + File coordinateTempDir = null; + + try { + // check that we run this test only if the java command + // is available on this machine + String javaCommand = getJavaCommandPath(); + if (javaCommand == null) { + System.out.println("---- Skipping Process Failure test : Could not find java executable ----"); + return; + } + + // create a logging file for the process + File tempLogFile = File.createTempFile(getClass().getSimpleName() + "-", "-log4j.properties"); + tempLogFile.deleteOnExit(); + CommonTestUtils.printLog4jDebugConfig(tempLogFile); + + // coordination between the processes goes through a directory + coordinateTempDir = CommonTestUtils.createTempDirectory(); + + // find a free port to start the JobManager + final int jobManagerPort = NetUtils.getAvailablePort(); + + // start a JobManager + Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", jobManagerPort); + + Configuration jmConfig = new Configuration(); + jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms"); + jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 s"); + jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9); + jmConfig.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "10 s"); + + jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<Tuple2<String, Object>>(localAddress)); + ActorRef jmActor = JobManager.startJobManagerActors( + jmConfig, + jmActorSystem, + StreamingMode.STREAMING, + JobManager.class, + MemoryArchivist.class)._1(); + + // the TaskManager java command + String[] command = new String[] { + javaCommand, + "-Dlog.level=DEBUG", + "-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(), + "-Xms80m", "-Xmx80m", + "-classpath", getCurrentClasspath(), + TaskManagerProcessEntryPoint.class.getName(), + String.valueOf(jobManagerPort) + }; + + // start the first two TaskManager processes + taskManagerProcess1 = new ProcessBuilder(command).start(); + new CommonTestUtils.PipeForwarder(taskManagerProcess1.getErrorStream(), processOutput1); + taskManagerProcess2 = new ProcessBuilder(command).start(); + new CommonTestUtils.PipeForwarder(taskManagerProcess2.getErrorStream(), processOutput2); + + // we wait for the JobManager to have the two TaskManagers available + // since some of the CI environments are very hostile, we need to give this a lot of time (2 minutes) + waitUntilNumTaskManagersAreRegistered(jmActor, 2, 120000); + + // the program will set a marker file in each of its parallel tasks once they are ready, so that + // this coordinating code is aware of this. + // the program will very slowly consume elements until the marker file (later created by the + // test driver code) is present + final File coordinateDirClosure = coordinateTempDir; + final AtomicReference<Throwable> errorRef = new AtomicReference<>(); + + // we trigger program execution in a separate thread + Thread programTrigger = new Thread("Program Trigger") { + @Override + public void run() { + try { + testTaskManagerFailure(jobManagerPort, coordinateDirClosure); + } + catch (Throwable t) { + t.printStackTrace(); + errorRef.set(t); + } + } + }; + + //start the test program + programTrigger.start(); + + // wait until all marker files are in place, indicating that all tasks have started + // max 20 seconds + if (!waitForMarkerFiles(coordinateTempDir, READY_MARKER_FILE_PREFIX, PARALLELISM, 120000)) { + // check if the program failed for some reason + if (errorRef.get() != null) { + Throwable error = errorRef.get(); + error.printStackTrace(); + fail("The program encountered a " + error.getClass().getSimpleName() + " : " + error.getMessage()); + } + else { + // no error occurred, simply a timeout + fail("The tasks were not started within time (" + 120000 + "msecs)"); + } + } + + // start the third TaskManager + taskManagerProcess3 = new ProcessBuilder(command).start(); + new CommonTestUtils.PipeForwarder(taskManagerProcess3.getErrorStream(), processOutput3); + + // we wait for the third TaskManager to register + // since some of the CI environments are very hostile, we need to give this a lot of time (2 minutes) + waitUntilNumTaskManagersAreRegistered(jmActor, 3, 120000); + + // kill one of the previous TaskManagers, triggering a failure and recovery + taskManagerProcess1.destroy(); + taskManagerProcess1 = null; + + // we create the marker file which signals the program functions tasks that they can complete + touchFile(new File(coordinateTempDir, PROCEED_MARKER_FILE)); + + // wait for at most 5 minutes for the program to complete + programTrigger.join(300000); + + // check that the program really finished + assertFalse("The program did not finish in time", programTrigger.isAlive()); + + // check whether the program encountered an error + if (errorRef.get() != null) { + Throwable error = errorRef.get(); + error.printStackTrace(); + fail("The program encountered a " + error.getClass().getSimpleName() + " : " + error.getMessage()); + } + + // all seems well :-) + } + catch (Exception e) { + e.printStackTrace(); + printProcessLog("TaskManager 1", processOutput1.toString()); + printProcessLog("TaskManager 2", processOutput2.toString()); + printProcessLog("TaskManager 3", processOutput3.toString()); + fail(e.getMessage()); + } + catch (Error e) { + e.printStackTrace(); + printProcessLog("TaskManager 1", processOutput1.toString()); + printProcessLog("TaskManager 2", processOutput2.toString()); + printProcessLog("TaskManager 3", processOutput3.toString()); + throw e; + } + finally { + if (taskManagerProcess1 != null) { + taskManagerProcess1.destroy(); + } + if (taskManagerProcess2 != null) { + taskManagerProcess2.destroy(); + } + if (taskManagerProcess3 != null) { + taskManagerProcess3.destroy(); + } + if (jmActorSystem != null) { + jmActorSystem.shutdown(); + } + if (coordinateTempDir != null) { + try { + FileUtils.deleteDirectory(coordinateTempDir); + } + catch (Throwable t) { + // we can ignore this + } + } + } + } + + /** + * The test program should be implemented here in a form of a separate thread. + * This provides a solution for checking that it has been terminated. + * + * @param jobManagerPort The port for submitting the topology to the local cluster + * @param coordinateDir TaskManager failure will be triggered only after processes + * have successfully created file under this directory + */ + public abstract void testTaskManagerFailure(int jobManagerPort, File coordinateDir) throws Exception; + + + protected void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, int numExpected, long maxDelay) + throws Exception + { + final long deadline = System.currentTimeMillis() + maxDelay; + while (true) { + long remaining = deadline - System.currentTimeMillis(); + if (remaining <= 0) { + fail("The TaskManagers did not register within the expected time (" + maxDelay + "msecs)"); + } + + FiniteDuration timeout = new FiniteDuration(remaining, TimeUnit.MILLISECONDS); + + try { + Future<?> result = Patterns.ask(jobManager, + JobManagerMessages.getRequestNumberRegisteredTaskManager(), + new Timeout(timeout)); + Integer numTMs = (Integer) Await.result(result, timeout); + if (numTMs == numExpected) { + break; + } + } + catch (TimeoutException e) { + // ignore and retry + } + catch (ClassCastException e) { + fail("Wrong response: " + e.getMessage()); + } + } + } + + protected static void printProcessLog(String processName, String log) { + if (log == null || log.length() == 0) { + return; + } + + System.out.println("-----------------------------------------"); + System.out.println(" BEGIN SPAWNED PROCESS LOG FOR " + processName); + System.out.println("-----------------------------------------"); + System.out.println(log); + System.out.println("-----------------------------------------"); + System.out.println(" END SPAWNED PROCESS LOG"); + System.out.println("-----------------------------------------"); + } + + protected static void touchFile(File file) throws IOException { + if (!file.exists()) { + new FileOutputStream(file).close(); + } + if (!file.setLastModified(System.currentTimeMillis())) { + throw new IOException("Could not touch the file."); + } + } + + protected static boolean waitForMarkerFiles(File basedir, String prefix, int num, long timeout) { + long now = System.currentTimeMillis(); + final long deadline = now + timeout; + + + while (now < deadline) { + boolean allFound = true; + + for (int i = 0; i < num; i++) { + File nextToCheck = new File(basedir, prefix + i); + if (!nextToCheck.exists()) { + allFound = false; + break; + } + } + + if (allFound) { + return true; + } + else { + // not all found, wait for a bit + try { + Thread.sleep(10); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + + now = System.currentTimeMillis(); + } + } + + return false; + } + + // -------------------------------------------------------------------------------------------- + + /** + * The entry point for the TaskManager JVM. Simply configures and runs a TaskManager. + */ + public static class TaskManagerProcessEntryPoint { + + private static final Logger LOG = LoggerFactory.getLogger(TaskManagerProcessEntryPoint.class); + + public static void main(String[] args) { + try { + int jobManagerPort = Integer.parseInt(args[0]); + + Configuration cfg = new Configuration(); + cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"); + cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort); + cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4); + cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100); + cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2); + + TaskManager.selectNetworkInterfaceAndRunTaskManager(cfg, StreamingMode.STREAMING, TaskManager.class); + + // wait forever + Object lock = new Object(); + synchronized (lock) { + lock.wait(); + } + } + catch (Throwable t) { + LOG.error("Failed to start TaskManager process", t); + System.exit(1); + } + } + } + +}
