http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java index 258e30a..1e2d3ec 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java @@ -30,16 +30,19 @@ import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; -import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; -import org.mockito.stubbing.OngoingStubbing; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.util.Collection; import java.util.concurrent.ConcurrentLinkedQueue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -62,6 +65,10 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> { final ExecutionConfig executionConfig; final Object checkpointLock; + + StreamTask<?, ?> mockTask; + + AbstractStateBackend stateBackend; public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator) { @@ -71,26 +78,33 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> { this.executionConfig = new ExecutionConfig(); this.checkpointLock = new Object(); - Environment env = new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024); - StreamTask<?, ?> mockTask = mock(StreamTask.class); + final Environment env = new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024); + mockTask = mock(StreamTask.class); when(mockTask.getName()).thenReturn("Mock Task"); when(mockTask.getCheckpointLock()).thenReturn(checkpointLock); when(mockTask.getConfiguration()).thenReturn(config); when(mockTask.getEnvironment()).thenReturn(env); when(mockTask.getExecutionConfig()).thenReturn(executionConfig); - - // ugly Java generic hacks - @SuppressWarnings("unchecked") - OngoingStubbing<StateBackend<?>> stubbing = - (OngoingStubbing<StateBackend<?>>) (OngoingStubbing<?>) when(mockTask.getStateBackend()); - stubbing.thenReturn(MemoryStateBackend.defaultInstance()); - operator.setup(mockTask, config, new MockOutput()); + try { + doAnswer(new Answer<AbstractStateBackend>() { + @Override + public AbstractStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable { + final String operatorIdentifier = (String) invocationOnMock.getArguments()[0]; + final TypeSerializer<?> keySerializer = (TypeSerializer<?>) invocationOnMock.getArguments()[1]; + MemoryStateBackend backend = MemoryStateBackend.create(); + backend.initializeForJob(env, operatorIdentifier, keySerializer); + return backend; + } + }).when(mockTask).createStateBackend(any(String.class), any(TypeSerializer.class)); + } catch (Exception e) { + e.printStackTrace(); + } } public <K> void configureForKeyedStream(KeySelector<IN, K> keySelector, TypeInformation<K> keyType) { ClosureCleaner.clean(keySelector, false); - config.setStatePartitioner(keySelector); + config.setStatePartitioner(0, keySelector); config.setStateKeySerializer(keyType.createSerializer(executionConfig)); } @@ -107,6 +121,8 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> { * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open()} */ public void open() throws Exception { + operator.setup(mockTask, config, new MockOutput()); + operator.open(); } @@ -119,13 +135,13 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> { } public void processElement(StreamRecord<IN> element) throws Exception { - operator.setKeyContextElement(element); + operator.setKeyContextElement1(element); operator.processElement(element); } public void processElements(Collection<StreamRecord<IN>> elements) throws Exception { for (StreamRecord<IN> element: elements) { - operator.setKeyContextElement(element); + operator.setKeyContextElement1(element); operator.processElement(element); } }
http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java index c586db3..e23673a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java @@ -28,7 +28,7 @@ import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; -import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -76,12 +76,6 @@ public class TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> { when(mockTask.getEnvironment()).thenReturn(env); when(mockTask.getExecutionConfig()).thenReturn(executionConfig); - // ugly Java generic hacks - @SuppressWarnings("unchecked") - OngoingStubbing<StateBackend<?>> stubbing = - (OngoingStubbing<StateBackend<?>>) (OngoingStubbing<?>) when(mockTask.getStateBackend()); - stubbing.thenReturn(MemoryStateBackend.defaultInstance()); - operator.setup(mockTask, new StreamConfig(new Configuration()), new MockOutput()); } http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index 69147f6..29bf5da 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -23,7 +23,8 @@ import org.apache.flink.api.common.io.{FileInputFormat, InputFormat} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer import org.apache.flink.api.scala.ClosureCleaner -import org.apache.flink.runtime.state.StateBackend +import org.apache.flink.runtime.state.AbstractStateBackend +import org.apache.flink.streaming.api.{TimeCharacteristic, CheckpointingMode} import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv} import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType import org.apache.flink.streaming.api.functions.source.SourceFunction @@ -211,7 +212,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * program can be executed highly available and strongly consistent (assuming that Flink * is run in high-availability mode). */ - def setStateBackend(backend: StateBackend[_]): StreamExecutionEnvironment = { + def setStateBackend(backend: AbstractStateBackend): StreamExecutionEnvironment = { javaEnv.setStateBackend(backend) this } @@ -219,7 +220,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { /** * Returns the state backend that defines how to store and checkpoint state. */ - def getStateBackend: StateBackend[_] = javaEnv.getStateBackend() + def getStateBackend: AbstractStateBackend = javaEnv.getStateBackend() /** * Sets the number of times that failed tasks are re-executed. A value of zero http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala index d66cfdb..dc49173 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala @@ -19,9 +19,9 @@ package org.apache.flink.streaming.api.scala.function import org.apache.flink.api.common.functions.RichFunction +import org.apache.flink.api.common.state.OperatorState import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.configuration.Configuration -import org.apache.flink.api.common.state.OperatorState /** * Trait implementing the functionality necessary to apply stateful functions in http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java index 304dcb5..18c1b3c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java @@ -23,8 +23,8 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; @@ -383,7 +383,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { // ------------------------------------------------------------------------ private static class FailingSource extends RichEventTimeSourceFunction<Tuple2<Long, IntType>> - implements Checkpointed<Integer>, CheckpointNotifier + implements Checkpointed<Integer>, CheckpointListener { private static volatile boolean failedBefore = false; http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java index 81e8f0a..7a1a879 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java @@ -25,8 +25,8 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; @@ -451,7 +451,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { // ------------------------------------------------------------------------ private static class FailingSource extends RichEventTimeSourceFunction<Tuple2<Long, IntType>> - implements Checkpointed<Integer>, CheckpointNotifier + implements Checkpointed<Integer>, CheckpointListener { private static volatile boolean failedBefore = false; http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java index 42b6230..387421e 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java @@ -29,6 +29,9 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.state.OperatorState; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; @@ -190,14 +193,17 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes private static Map<Integer, Long> allCounts = new ConcurrentHashMap<Integer, Long>(); + private ValueStateDescriptor<Long> bCountsId = new ValueStateDescriptor<>("b", 0L, + LongSerializer.INSTANCE); + private OperatorState<NonSerializableLong> aCounts; - private OperatorState<Long> bCounts; + private ValueState<Long> bCounts; @Override public void open(Configuration parameters) throws IOException { aCounts = getRuntimeContext().getKeyValueState( "a", NonSerializableLong.class, NonSerializableLong.of(0L)); - bCounts = getRuntimeContext().getKeyValueState("b", Long.class, 0L); + bCounts = getRuntimeContext().getPartitionedState(bCountsId); } @Override @@ -224,6 +230,22 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes public static NonSerializableLong of(long value) { return new NonSerializableLong(value); } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + NonSerializableLong that = (NonSerializableLong) o; + + return value.equals(that.value); + + } + + @Override + public int hashCode() { + return value.hashCode(); + } } public static class IdentityKeySelector<T> implements KeySelector<T, T> { http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java index 4e5e1b5..46c0453 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java @@ -44,7 +44,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob; import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint; import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint; import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess; -import org.apache.flink.runtime.state.filesystem.AbstractFileState; +import org.apache.flink.runtime.state.filesystem.AbstractFileStateHandle; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved; @@ -53,7 +53,7 @@ import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ResponseS import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages; import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.ResponseSubmitTaskListener; import org.apache.flink.runtime.testutils.CommonTestUtils; -import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier; +import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -365,7 +365,7 @@ public class SavepointITCase extends TestLogger { for (StreamTaskState taskState : taskStateList.getState( ClassLoader.getSystemClassLoader())) { - AbstractFileState fsState = (AbstractFileState) taskState.getFunctionState(); + AbstractFileStateHandle fsState = (AbstractFileStateHandle) taskState.getFunctionState(); checkpointFiles.add(new File(fsState.getFilePath().toUri())); } } @@ -660,7 +660,7 @@ public class SavepointITCase extends TestLogger { for (StreamTaskState taskState : taskStateList.getState( ClassLoader.getSystemClassLoader())) { - AbstractFileState fsState = (AbstractFileState) taskState.getFunctionState(); + AbstractFileStateHandle fsState = (AbstractFileStateHandle) taskState.getFunctionState(); checkpointFiles.add(new File(fsState.getFilePath().toUri())); } } @@ -784,7 +784,7 @@ public class SavepointITCase extends TestLogger { } private static class InfiniteTestSource - implements SourceFunction<Integer>, CheckpointNotifier { + implements SourceFunction<Integer>, CheckpointListener { private static final long serialVersionUID = 1L; private volatile boolean running = true; http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java index d7c06f6..962fe84 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java @@ -21,9 +21,9 @@ package org.apache.flink.test.checkpointing; import org.apache.flink.api.common.functions.RichFilterFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.common.state.OperatorState; +import org.apache.flink.api.common.state.ValueState; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier; +import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; import org.apache.flink.streaming.api.datastream.DataStream; @@ -47,7 +47,7 @@ import static org.junit.Assert.assertTrue; * A simple test that runs a streaming topology with checkpointing enabled. * * The test triggers a failure after a while and verifies that, after completion, the - * state defined with either the {@link OperatorState} or the {@link Checkpointed} + * state defined with either the {@link ValueState} or the {@link Checkpointed} * interface reflects the "exactly once" semantics. * * The test throttles the input until at least two checkpoints are completed, to make sure that @@ -295,7 +295,7 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase { } private static class OnceFailingAggregator extends RichFlatMapFunction<PrefixCount, PrefixCount> - implements Checkpointed<HashMap<String, PrefixCount>>, CheckpointNotifier { + implements Checkpointed<HashMap<String, PrefixCount>>, CheckpointListener { static boolean wasCheckpointedBeforeFailure = false; http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java index 22f61b7..5fa0666 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java @@ -24,7 +24,7 @@ import org.apache.flink.api.common.functions.RichReduceFunction; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier; +import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -55,8 +55,8 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; /** - * Integration test for the {@link CheckpointNotifier} interface. The test ensures that - * {@link CheckpointNotifier#notifyCheckpointComplete(long)} is called for completed + * Integration test for the {@link CheckpointListener} interface. The test ensures that + * {@link CheckpointListener#notifyCheckpointComplete(long)} is called for completed * checkpoints, that it is called at most once for any checkpoint id and that it is not * called for a deliberately failed checkpoint. * @@ -66,7 +66,7 @@ import static org.junit.Assert.fail; * * <p> * Note that as a result of doing the checks on the task level there is no way to verify - * that the {@link CheckpointNotifier#notifyCheckpointComplete(long)} is called for every + * that the {@link CheckpointListener#notifyCheckpointComplete(long)} is called for every * successfully completed checkpoint. */ @SuppressWarnings("serial") @@ -197,11 +197,11 @@ public class StreamCheckpointNotifierITCase { // -------------------------------------------------------------------------------------------- /** - * Generates some Long values and as an implementation for the {@link CheckpointNotifier} + * Generates some Long values and as an implementation for the {@link CheckpointListener} * interface it stores all the checkpoint ids it has seen in a static list. */ private static class GeneratingSourceFunction extends RichSourceFunction<Long> - implements ParallelSourceFunction<Long>, CheckpointNotifier, Checkpointed<Integer> { + implements ParallelSourceFunction<Long>, CheckpointListener, Checkpointed<Integer> { static final List<Long>[] completedCheckpoints = createCheckpointLists(PARALLELISM); @@ -285,10 +285,10 @@ public class StreamCheckpointNotifierITCase { /** * Identity transform on Long values wrapping the output in a tuple. As an implementation - * for the {@link CheckpointNotifier} interface it stores all the checkpoint ids it has seen in a static list. + * for the {@link CheckpointListener} interface it stores all the checkpoint ids it has seen in a static list. */ private static class IdentityMapFunction extends RichMapFunction<Long, Tuple1<Long>> - implements CheckpointNotifier { + implements CheckpointListener { static final List<Long>[] completedCheckpoints = createCheckpointLists(PARALLELISM); @@ -316,10 +316,10 @@ public class StreamCheckpointNotifierITCase { /** * Filter on Long values supposedly letting all values through. As an implementation - * for the {@link CheckpointNotifier} interface it stores all the checkpoint ids + * for the {@link CheckpointListener} interface it stores all the checkpoint ids * it has seen in a static list. */ - private static class LongRichFilterFunction extends RichFilterFunction<Long> implements CheckpointNotifier { + private static class LongRichFilterFunction extends RichFilterFunction<Long> implements CheckpointListener { static final List<Long>[] completedCheckpoints = createCheckpointLists(PARALLELISM); @@ -347,11 +347,11 @@ public class StreamCheckpointNotifierITCase { /** * CoFlatMap on Long values as identity transform on the left input, while ignoring the right. - * As an implementation for the {@link CheckpointNotifier} interface it stores all the checkpoint + * As an implementation for the {@link CheckpointListener} interface it stores all the checkpoint * ids it has seen in a static list. */ private static class LeftIdentityCoRichFlatMapFunction extends RichCoFlatMapFunction<Long, Long, Long> - implements CheckpointNotifier { + implements CheckpointListener { static final List<Long>[] completedCheckpoints = createCheckpointLists(PARALLELISM); @@ -386,7 +386,7 @@ public class StreamCheckpointNotifierITCase { * Reducer that causes one failure between seeing 40% to 70% of the records. */ private static class OnceFailingReducer extends RichReduceFunction<Tuple1<Long>> - implements Checkpointed<Long>, CheckpointNotifier + implements Checkpointed<Long>, CheckpointListener { static volatile boolean hasFailed = false; static volatile long failureCheckpointID; http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java index 500d7d3..8d59975 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java @@ -26,8 +26,8 @@ import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; @@ -335,7 +335,7 @@ public class WindowCheckpointingITCase extends TestLogger { // ------------------------------------------------------------------------ private static class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>> - implements Checkpointed<Integer>, CheckpointNotifier + implements Checkpointed<Integer>, CheckpointListener { private static volatile boolean failedBefore = false; http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java index cda5a7b..bc7cebb 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java +++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java @@ -19,7 +19,7 @@ package org.apache.flink.test.classloading.jar; import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier; +import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -83,7 +83,7 @@ public class CheckpointedStreamingProgram { } } - public static class StatefulMapper implements MapFunction<String, String>, Checkpointed<StatefulMapper>, CheckpointNotifier { + public static class StatefulMapper implements MapFunction<String, String>, Checkpointed<StatefulMapper>, CheckpointListener { private String someState; private boolean atLeastOneSnapshotComplete = false; http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java index acc8569..6ae0d46 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java @@ -41,7 +41,7 @@ import org.apache.flink.runtime.testutils.TestJvmProcess; import org.apache.flink.runtime.testutils.ZooKeeperTestUtils; import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment; -import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier; +import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; @@ -378,7 +378,7 @@ public class ChaosMonkeyITCase extends TestLogger { } public static class CheckpointedSequenceSource extends RichParallelSourceFunction<Long> - implements Checkpointed<Long>, CheckpointNotifier { + implements Checkpointed<Long>, CheckpointListener { private static final long serialVersionUID = 0L; @@ -448,7 +448,7 @@ public class ChaosMonkeyITCase extends TestLogger { } public static class CountingSink extends RichSinkFunction<Long> - implements Checkpointed<CountingSink>, CheckpointNotifier { + implements Checkpointed<CountingSink>, CheckpointListener { private static final Logger LOG = LoggerFactory.getLogger(CountingSink.class); http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java index cc4998d..737d39a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java @@ -40,7 +40,7 @@ import org.apache.flink.runtime.testutils.JobManagerProcess; import org.apache.flink.runtime.testutils.ZooKeeperTestUtils; import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment; -import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier; +import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; @@ -505,7 +505,7 @@ public class JobManagerCheckpointRecoveryITCase extends TestLogger { * are exhausted. */ public static class CountingSink implements SinkFunction<Long>, Checkpointed<CountingSink>, - CheckpointNotifier { + CheckpointListener { private static final Logger LOG = LoggerFactory.getLogger(CountingSink.class);
