Repository: flink
Updated Branches:
  refs/heads/master 6bdaf1e4a -> 90ca43810


http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceExternalCheckpointTriggerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceExternalCheckpointTriggerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceExternalCheckpointTriggerTest.java
new file mode 100644
index 0000000..e5caff3
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceExternalCheckpointTriggerTest.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.streaming.api.checkpoint.ExternallyInducedSource;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import org.junit.Test;
+
+import java.util.concurrent.BlockingQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * These tests verify the behavior of a source function that triggers 
checkpoints
+ * in response to received events.
+ */
+@SuppressWarnings("serial")
+public class SourceExternalCheckpointTriggerTest {
+
+       static final OneShotLatch ready = new OneShotLatch();
+       static final MultiShotLatch sync = new MultiShotLatch();
+
+       @Test
+       public void testCheckpointsTriggeredBySource() throws Exception {
+               // set up the basic test harness
+               final SourceStreamTask<Long, ?, ?> sourceTask = new 
SourceStreamTask<Long, ExternalCheckpointsSource, StreamSource<Long, 
ExternalCheckpointsSource>>();
+               final StreamTaskTestHarness<Long> testHarness = new 
StreamTaskTestHarness<>(sourceTask, BasicTypeInfo.LONG_TYPE_INFO);
+               testHarness.setupOutputForSingletonOperatorChain();
+               testHarness.getExecutionConfig().setLatencyTrackingInterval(-1);
+
+               final long numElements = 10;
+               final long checkpointEvery = 3;
+
+               // set up the source function
+               ExternalCheckpointsSource source = new 
ExternalCheckpointsSource(numElements, checkpointEvery);
+               StreamConfig streamConfig = testHarness.getStreamConfig();
+               StreamSource<Long, ?> sourceOperator = new 
StreamSource<>(source);
+               streamConfig.setStreamOperator(sourceOperator);
+
+               // this starts the source thread
+               testHarness.invoke();
+               ready.await();
+
+               // now send an external trigger that should be ignored
+               assertTrue(sourceTask.triggerCheckpoint(new 
CheckpointMetaData(32, 829), CheckpointOptions.forFullCheckpoint()));
+
+               // step by step let the source thread emit elements
+               sync.trigger();
+               verifyNextElement(testHarness.getOutput(), 1L);
+               sync.trigger();
+               verifyNextElement(testHarness.getOutput(), 2L);
+               sync.trigger();
+               verifyNextElement(testHarness.getOutput(), 3L);
+
+               verifyCheckpointBarrier(testHarness.getOutput(), 1L);
+
+               sync.trigger();
+               verifyNextElement(testHarness.getOutput(), 4L);
+
+               // now send an regular trigger command that should be ignored
+               assertTrue(sourceTask.triggerCheckpoint(new 
CheckpointMetaData(34, 900), CheckpointOptions.forFullCheckpoint()));
+
+               sync.trigger();
+               verifyNextElement(testHarness.getOutput(), 5L);
+               sync.trigger();
+               verifyNextElement(testHarness.getOutput(), 6L);
+
+               verifyCheckpointBarrier(testHarness.getOutput(), 2L);
+
+               // let the remainder run
+
+               for (long l = 7L, checkpoint = 3L; l <= numElements; l++) {
+                       sync.trigger();
+                       verifyNextElement(testHarness.getOutput(), l);
+
+                       if (l % checkpointEvery == 0) {
+                               
verifyCheckpointBarrier(testHarness.getOutput(), checkpoint++);
+                       }
+               }
+
+               // done!
+       }
+
+       @SuppressWarnings("unchecked")
+       private void verifyNextElement(BlockingQueue<Object> output, long 
expectedElement) throws InterruptedException {
+               Object next = output.take();
+               assertTrue("next element is not an event", next instanceof 
StreamRecord);
+               assertEquals("wrong event", expectedElement, 
((StreamRecord<Long>) next).getValue().longValue());
+       }
+
+       private void verifyCheckpointBarrier(BlockingQueue<Object> output, long 
checkpointId) throws InterruptedException {
+               Object next = output.take();
+               assertTrue("next element is not a checkpoint barrier", next 
instanceof CheckpointBarrier);
+               assertEquals("wrong checkpoint id", checkpointId, 
((CheckpointBarrier) next).getId());
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static class ExternalCheckpointsSource
+                       implements ParallelSourceFunction<Long>, 
ExternallyInducedSource<Long, Object> {
+
+               private final long numEvents;
+               private final long checkpointFrequency;
+               
+               private CheckpointTrigger trigger;
+
+               ExternalCheckpointsSource(long numEvents, long 
checkpointFrequency) {
+                       this.numEvents = numEvents;
+                       this.checkpointFrequency = checkpointFrequency;
+               }
+
+               @Override
+               public void run(SourceContext<Long> ctx) throws Exception {
+                       ready.trigger();
+
+                       // for simplicity in this test, we just trigger 
checkpoints in ascending order
+                       long checkpoint = 1;
+
+                       for (long num = 1; num <= numEvents; num++) {
+                               sync.await();
+                               ctx.collect(num);
+                               if (num % checkpointFrequency == 0) {
+                                       trigger.triggerCheckpoint(checkpoint++);
+                               }
+                       }
+               }
+
+               @Override
+               public void cancel() {}
+
+               @Override
+               public void setCheckpointTrigger(CheckpointTrigger 
checkpointTrigger) {
+                       this.trigger = checkpointTrigger;
+               }
+
+               @Override
+               public MasterTriggerRestoreHook<Object> 
createMasterTriggerRestoreHook() {
+                       // not relevant in this test
+                       throw new UnsupportedOperationException("not 
implemented");
+               }
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index c51af4e..0be85b1 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -45,6 +45,7 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * Test harness for testing a {@link StreamTask}.
@@ -83,7 +84,7 @@ public class StreamTaskTestHarness<OUT> {
        private TypeSerializer<OUT> outputSerializer;
        private TypeSerializer<StreamElement> outputStreamRecordSerializer;
 
-       private ConcurrentLinkedQueue<Object> outputList;
+       private LinkedBlockingQueue<Object> outputList;
 
        protected TaskThread taskThread;
 
@@ -125,7 +126,7 @@ public class StreamTaskTestHarness<OUT> {
 
        @SuppressWarnings("unchecked")
        private void initializeOutput() {
-               outputList = new ConcurrentLinkedQueue<Object>();
+               outputList = new LinkedBlockingQueue<Object>();
                mockEnv.addOutput(outputList, outputStreamRecordSerializer);
        }
 
@@ -265,7 +266,7 @@ public class StreamTaskTestHarness<OUT> {
         * {@link 
org.apache.flink.streaming.util.TestHarnessUtil#getRawElementsFromOutput(java.util.Queue)}}
         * to extract only the StreamRecords.
         */
-       public ConcurrentLinkedQueue<Object> getOutput() {
+       public LinkedBlockingQueue<Object> getOutput() {
                return outputList;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 3718a94..e0de7d2 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -40,7 +40,7 @@ import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskState;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
@@ -200,7 +200,7 @@ public class SavepointITCase extends TestLogger {
                        LOG.info("Requesting the savepoint.");
                        Future<Object> savepointFuture = jobManager.ask(new 
RequestSavepoint(savepointPath), deadline.timeLeft());
 
-                       SavepointV1 savepoint = (SavepointV1) 
((ResponseSavepoint) Await.result(savepointFuture, 
deadline.timeLeft())).savepoint();
+                       SavepointV2 savepoint = (SavepointV2) 
((ResponseSavepoint) Await.result(savepointFuture, 
deadline.timeLeft())).savepoint();
                        LOG.info("Retrieved savepoint: " + savepointPath + ".");
 
                        // Shut down the Flink cluster (thereby canceling the 
job)

Reply via email to