Repository: flink Updated Branches: refs/heads/master 6bdaf1e4a -> 90ca43810
http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceExternalCheckpointTriggerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceExternalCheckpointTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceExternalCheckpointTriggerTest.java new file mode 100644 index 0000000..e5caff3 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceExternalCheckpointTriggerTest.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.core.testutils.MultiShotLatch; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.streaming.api.checkpoint.ExternallyInducedSource; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import org.junit.Test; + +import java.util.concurrent.BlockingQueue; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * These tests verify the behavior of a source function that triggers checkpoints + * in response to received events. + */ +@SuppressWarnings("serial") +public class SourceExternalCheckpointTriggerTest { + + static final OneShotLatch ready = new OneShotLatch(); + static final MultiShotLatch sync = new MultiShotLatch(); + + @Test + public void testCheckpointsTriggeredBySource() throws Exception { + // set up the basic test harness + final SourceStreamTask<Long, ?, ?> sourceTask = new SourceStreamTask<Long, ExternalCheckpointsSource, StreamSource<Long, ExternalCheckpointsSource>>(); + final StreamTaskTestHarness<Long> testHarness = new StreamTaskTestHarness<>(sourceTask, BasicTypeInfo.LONG_TYPE_INFO); + testHarness.setupOutputForSingletonOperatorChain(); + testHarness.getExecutionConfig().setLatencyTrackingInterval(-1); + + final long numElements = 10; + final long checkpointEvery = 3; + + // set up the source function + ExternalCheckpointsSource source = new ExternalCheckpointsSource(numElements, checkpointEvery); + StreamConfig streamConfig = testHarness.getStreamConfig(); + StreamSource<Long, ?> sourceOperator = new StreamSource<>(source); + streamConfig.setStreamOperator(sourceOperator); + + // this starts the source thread + testHarness.invoke(); + ready.await(); + + // now send an external trigger that should be ignored + assertTrue(sourceTask.triggerCheckpoint(new CheckpointMetaData(32, 829), CheckpointOptions.forFullCheckpoint())); + + // step by step let the source thread emit elements + sync.trigger(); + verifyNextElement(testHarness.getOutput(), 1L); + sync.trigger(); + verifyNextElement(testHarness.getOutput(), 2L); + sync.trigger(); + verifyNextElement(testHarness.getOutput(), 3L); + + verifyCheckpointBarrier(testHarness.getOutput(), 1L); + + sync.trigger(); + verifyNextElement(testHarness.getOutput(), 4L); + + // now send an regular trigger command that should be ignored + assertTrue(sourceTask.triggerCheckpoint(new CheckpointMetaData(34, 900), CheckpointOptions.forFullCheckpoint())); + + sync.trigger(); + verifyNextElement(testHarness.getOutput(), 5L); + sync.trigger(); + verifyNextElement(testHarness.getOutput(), 6L); + + verifyCheckpointBarrier(testHarness.getOutput(), 2L); + + // let the remainder run + + for (long l = 7L, checkpoint = 3L; l <= numElements; l++) { + sync.trigger(); + verifyNextElement(testHarness.getOutput(), l); + + if (l % checkpointEvery == 0) { + verifyCheckpointBarrier(testHarness.getOutput(), checkpoint++); + } + } + + // done! + } + + @SuppressWarnings("unchecked") + private void verifyNextElement(BlockingQueue<Object> output, long expectedElement) throws InterruptedException { + Object next = output.take(); + assertTrue("next element is not an event", next instanceof StreamRecord); + assertEquals("wrong event", expectedElement, ((StreamRecord<Long>) next).getValue().longValue()); + } + + private void verifyCheckpointBarrier(BlockingQueue<Object> output, long checkpointId) throws InterruptedException { + Object next = output.take(); + assertTrue("next element is not a checkpoint barrier", next instanceof CheckpointBarrier); + assertEquals("wrong checkpoint id", checkpointId, ((CheckpointBarrier) next).getId()); + } + + // ------------------------------------------------------------------------ + + private static class ExternalCheckpointsSource + implements ParallelSourceFunction<Long>, ExternallyInducedSource<Long, Object> { + + private final long numEvents; + private final long checkpointFrequency; + + private CheckpointTrigger trigger; + + ExternalCheckpointsSource(long numEvents, long checkpointFrequency) { + this.numEvents = numEvents; + this.checkpointFrequency = checkpointFrequency; + } + + @Override + public void run(SourceContext<Long> ctx) throws Exception { + ready.trigger(); + + // for simplicity in this test, we just trigger checkpoints in ascending order + long checkpoint = 1; + + for (long num = 1; num <= numEvents; num++) { + sync.await(); + ctx.collect(num); + if (num % checkpointFrequency == 0) { + trigger.triggerCheckpoint(checkpoint++); + } + } + } + + @Override + public void cancel() {} + + @Override + public void setCheckpointTrigger(CheckpointTrigger checkpointTrigger) { + this.trigger = checkpointTrigger; + } + + @Override + public MasterTriggerRestoreHook<Object> createMasterTriggerRestoreHook() { + // not relevant in this test + throw new UnsupportedOperationException("not implemented"); + } + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java index c51af4e..0be85b1 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java @@ -45,6 +45,7 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; /** * Test harness for testing a {@link StreamTask}. @@ -83,7 +84,7 @@ public class StreamTaskTestHarness<OUT> { private TypeSerializer<OUT> outputSerializer; private TypeSerializer<StreamElement> outputStreamRecordSerializer; - private ConcurrentLinkedQueue<Object> outputList; + private LinkedBlockingQueue<Object> outputList; protected TaskThread taskThread; @@ -125,7 +126,7 @@ public class StreamTaskTestHarness<OUT> { @SuppressWarnings("unchecked") private void initializeOutput() { - outputList = new ConcurrentLinkedQueue<Object>(); + outputList = new LinkedBlockingQueue<Object>(); mockEnv.addOutput(outputList, outputStreamRecordSerializer); } @@ -265,7 +266,7 @@ public class StreamTaskTestHarness<OUT> { * {@link org.apache.flink.streaming.util.TestHarnessUtil#getRawElementsFromOutput(java.util.Queue)}} * to extract only the StreamRecords. */ - public ConcurrentLinkedQueue<Object> getOutput() { + public LinkedBlockingQueue<Object> getOutput() { return outputList; } http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java index 3718a94..e0de7d2 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java @@ -40,7 +40,7 @@ import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.checkpoint.TaskState; -import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.executiongraph.TaskInformation; @@ -200,7 +200,7 @@ public class SavepointITCase extends TestLogger { LOG.info("Requesting the savepoint."); Future<Object> savepointFuture = jobManager.ask(new RequestSavepoint(savepointPath), deadline.timeLeft()); - SavepointV1 savepoint = (SavepointV1) ((ResponseSavepoint) Await.result(savepointFuture, deadline.timeLeft())).savepoint(); + SavepointV2 savepoint = (SavepointV2) ((ResponseSavepoint) Await.result(savepointFuture, deadline.timeLeft())).savepoint(); LOG.info("Retrieved savepoint: " + savepointPath + "."); // Shut down the Flink cluster (thereby canceling the job)
