http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java index 2036f69..f638ddd 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java @@ -50,6 +50,7 @@ import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.ChainedStateHandle; +import org.apache.flink.runtime.state.CheckpointStateHandles; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; @@ -317,7 +318,7 @@ public class StreamMockEnvironment implements Environment { @Override public void acknowledgeCheckpoint( long checkpointId, - ChainedStateHandle<StreamStateHandle> chainedStateHandle, List<KeyGroupsStateHandle> keyGroupStateHandles, + CheckpointStateHandles checkpointStateHandles, long synchronousDurationMillis, long asynchronousDurationMillis, long bytesBufferedInAlignment, long alignmentDurationNanos) { }
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java index 430c6de..247edd6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java @@ -24,12 +24,14 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; import org.mockito.invocation.InvocationOnMock; @@ -41,11 +43,12 @@ import java.util.Collections; import java.util.concurrent.RunnableFuture; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.doAnswer; /** * Extension of {@link OneInputStreamOperatorTestHarness} that allows the operator to get - * a {@link KeyedStateBackend}. + * a {@link AbstractKeyedStateBackend}. * */ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> @@ -53,7 +56,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> // in case the operator creates one we store it here so that we // can snapshot its state - private KeyedStateBackend<?> keyedStateBackend = null; + private AbstractKeyedStateBackend<?> keyedStateBackend = null; // when we restore we keep the state here so that we can call restore // when the operator requests the keyed state backend @@ -114,7 +117,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> final KeyGroupRange keyGroupRange = (KeyGroupRange) invocationOnMock.getArguments()[2]; if(keyedStateBackend != null) { - keyedStateBackend.close(); + keyedStateBackend.dispose(); } if (restoredKeyedState == null) { @@ -148,7 +151,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> } /** - * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#snapshotState(org.apache.flink.core.fs.FSDataOutputStream, long, long)} ()} + * */ @Override public StreamStateHandle snapshot(long checkpointId, long timestamp) throws Exception { @@ -159,7 +162,9 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> CheckpointStreamFactory.CheckpointStateOutputStream outStream = streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp); - operator.snapshotState(outStream, checkpointId, timestamp); + if (operator instanceof StreamCheckpointedOperator) { + ((StreamCheckpointedOperator) operator).snapshotState(outStream, checkpointId, timestamp); + } if (keyedStateBackend != null) { RunnableFuture<KeyGroupsStateHandle> keyedSnapshotRunnable = keyedStateBackend.snapshot(checkpointId, @@ -180,17 +185,21 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> } /** - * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#restoreState(org.apache.flink.core.fs.FSDataInputStream)} ()} + * */ @Override public void restore(StreamStateHandle snapshot) throws Exception { - FSDataInputStream inStream = snapshot.openInputStream(); - operator.restoreState(inStream); + try (FSDataInputStream inStream = snapshot.openInputStream()) { + + if (operator instanceof StreamCheckpointedOperator) { + ((StreamCheckpointedOperator) operator).restoreState(inStream); + } - byte keyedStatePresent = (byte) inStream.read(); - if (keyedStatePresent == 1) { - ObjectInputStream ois = new ObjectInputStream(inStream); - this.restoredKeyedState = (KeyGroupsStateHandle) ois.readObject(); + byte keyedStatePresent = (byte) inStream.read(); + if (keyedStatePresent == 1) { + ObjectInputStream ois = new ObjectInputStream(inStream); + this.restoredKeyedState = (KeyGroupsStateHandle) ois.readObject(); + } } } @@ -200,7 +209,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> public void close() throws Exception { super.close(); if(keyedStateBackend != null) { - keyedStateBackend.close(); + keyedStateBackend.dispose(); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java index acf046a..d6f46fd 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java @@ -22,7 +22,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; @@ -30,6 +30,7 @@ import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; @@ -39,7 +40,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.AsynchronousException; import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider; import org.apache.flink.streaming.runtime.tasks.StreamTask; -import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider; import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -204,14 +204,18 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> { } /** - * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#snapshotState(FSDataOutputStream, long, long)} ()} + * */ public StreamStateHandle snapshot(long checkpointId, long timestamp) throws Exception { CheckpointStreamFactory.CheckpointStateOutputStream outStream = stateBackend.createStreamFactory( new JobID(), "test_op").createCheckpointStateOutputStream(checkpointId, timestamp); - operator.snapshotState(outStream, checkpointId, timestamp); - return outStream.closeAndGetHandle(); + if(operator instanceof StreamCheckpointedOperator) { + ((StreamCheckpointedOperator) operator).snapshotState(outStream, checkpointId, timestamp); + return outStream.closeAndGetHandle(); + } else { + throw new RuntimeException("Operator is not StreamCheckpointedOperator"); + } } /** @@ -222,10 +226,16 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> { } /** - * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#restoreState(org.apache.flink.core.fs.FSDataInputStream)} ()} + * */ public void restore(StreamStateHandle snapshot) throws Exception { - operator.restoreState(snapshot.openInputStream()); + if(operator instanceof StreamCheckpointedOperator) { + try (FSDataInputStream in = snapshot.openInputStream()) { + ((StreamCheckpointedOperator) operator).restoreState(in); + } + } else { + throw new RuntimeException("Operator is not StreamCheckpointedOperator"); + } } /** http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java index c12bcb9..5874f56 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java @@ -26,6 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; @@ -35,6 +36,8 @@ import org.apache.flink.streaming.api.operators.StreamGroupedFold; import org.apache.flink.streaming.api.operators.StreamGroupedReduce; import org.junit.Assert; +import java.util.Collections; +import java.util.List; import java.util.Queue; import java.util.Random; @@ -180,7 +183,7 @@ public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTe */ private static class OnceFailingIdentityMapFunction extends RichMapFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>> - implements Checkpointed<Long> { + implements ListCheckpointed<Long> { private static volatile boolean hasFailed = false; @@ -211,15 +214,16 @@ public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTe return value; } - @Override - public Long snapshotState(long checkpointId, long checkpointTimestamp) { - return count; + public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(count); } @Override - public void restoreState(Long state) { - count = state; + public void restoreState(List<Long> state) throws Exception { + if(!state.isEmpty()) { + count = state.get(0); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java index 694f006..2a635ab 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java @@ -21,17 +21,18 @@ package org.apache.flink.test.streaming.runtime; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupsStateHandle; -import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.junit.Test; @@ -66,7 +67,7 @@ public class StateBackendITCase extends StreamingMultipleProgramsTestBase { @Override public void open(Configuration parameters) throws Exception { super.open(parameters); - getRuntimeContext().getKeyValueState("test", String.class, ""); + getRuntimeContext().getState(new ValueStateDescriptor<Integer>("Test", Integer.class, 0)); } @Override @@ -99,7 +100,8 @@ public class StateBackendITCase extends StreamingMultipleProgramsTestBase { } @Override - public <K> KeyedStateBackend<K> createKeyedStateBackend(Environment env, + public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend( + Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, @@ -110,7 +112,8 @@ public class StateBackendITCase extends StreamingMultipleProgramsTestBase { } @Override - public <K> KeyedStateBackend<K> restoreKeyedStateBackend(Environment env, + public <K> AbstractKeyedStateBackend<K> restoreKeyedStateBackend( + Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer,
