Repository: flink Updated Branches: refs/heads/master 4e7f03e41 -> f9a583b72
[hotfix] Rename OperatorSnapshotResult to OperatorSnapshotFutures. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ea0d16d4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ea0d16d4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ea0d16d4 Branch: refs/heads/master Commit: ea0d16d4bac639dc4858a6c5cef209e904e655ef Parents: 617e67c Author: Stefan Richter <[email protected]> Authored: Thu Feb 22 16:14:39 2018 +0100 Committer: Stefan Richter <[email protected]> Committed: Sun Feb 25 15:10:28 2018 +0100 ---------------------------------------------------------------------- .../api/operators/AbstractStreamOperator.java | 4 +- .../api/operators/OperatorSnapshotFutures.java | 134 +++++++++++++++++++ .../api/operators/OperatorSnapshotResult.java | 134 ------------------- .../streaming/api/operators/StreamOperator.java | 2 +- .../streaming/runtime/tasks/StreamTask.java | 18 +-- .../operators/AbstractStreamOperatorTest.java | 4 +- .../operators/OperatorSnapshotFuturesTest.java | 82 ++++++++++++ .../operators/OperatorSnapshotResultTest.java | 82 ------------ .../streaming/runtime/tasks/StreamTaskTest.java | 18 +-- .../util/AbstractStreamOperatorTestHarness.java | 4 +- 10 files changed, 241 insertions(+), 241 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ea0d16d4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 42b6923..4f16259 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -347,7 +347,7 @@ public abstract class AbstractStreamOperator<OUT> } @Override - public final OperatorSnapshotResult snapshotState( + public final OperatorSnapshotFutures snapshotState( long checkpointId, long timestamp, CheckpointOptions checkpointOptions, @@ -356,7 +356,7 @@ public abstract class AbstractStreamOperator<OUT> KeyGroupRange keyGroupRange = null != keyedStateBackend ? keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE; - OperatorSnapshotResult snapshotInProgress = new OperatorSnapshotResult(); + OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures(); try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl( checkpointId, http://git-wip-us.apache.org/repos/asf/flink/blob/ea0d16d4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFutures.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFutures.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFutures.java new file mode 100644 index 0000000..bdaf64b --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFutures.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.StateUtil; +import org.apache.flink.util.ExceptionUtils; + +import java.util.concurrent.RunnableFuture; + +/** + * Result of {@link StreamOperator#snapshotState}. + */ +public class OperatorSnapshotFutures { + + private RunnableFuture<KeyedStateHandle> keyedStateManagedFuture; + private RunnableFuture<KeyedStateHandle> keyedStateRawFuture; + private RunnableFuture<OperatorStateHandle> operatorStateManagedFuture; + private RunnableFuture<OperatorStateHandle> operatorStateRawFuture; + + public OperatorSnapshotFutures() { + this(null, null, null, null); + } + + public OperatorSnapshotFutures( + RunnableFuture<KeyedStateHandle> keyedStateManagedFuture, + RunnableFuture<KeyedStateHandle> keyedStateRawFuture, + RunnableFuture<OperatorStateHandle> operatorStateManagedFuture, + RunnableFuture<OperatorStateHandle> operatorStateRawFuture) { + this.keyedStateManagedFuture = keyedStateManagedFuture; + this.keyedStateRawFuture = keyedStateRawFuture; + this.operatorStateManagedFuture = operatorStateManagedFuture; + this.operatorStateRawFuture = operatorStateRawFuture; + } + + public RunnableFuture<KeyedStateHandle> getKeyedStateManagedFuture() { + return keyedStateManagedFuture; + } + + public void setKeyedStateManagedFuture(RunnableFuture<KeyedStateHandle> keyedStateManagedFuture) { + this.keyedStateManagedFuture = keyedStateManagedFuture; + } + + public RunnableFuture<KeyedStateHandle> getKeyedStateRawFuture() { + return keyedStateRawFuture; + } + + public void setKeyedStateRawFuture(RunnableFuture<KeyedStateHandle> keyedStateRawFuture) { + this.keyedStateRawFuture = keyedStateRawFuture; + } + + public RunnableFuture<OperatorStateHandle> getOperatorStateManagedFuture() { + return operatorStateManagedFuture; + } + + public void setOperatorStateManagedFuture(RunnableFuture<OperatorStateHandle> operatorStateManagedFuture) { + this.operatorStateManagedFuture = operatorStateManagedFuture; + } + + public RunnableFuture<OperatorStateHandle> getOperatorStateRawFuture() { + return operatorStateRawFuture; + } + + public void setOperatorStateRawFuture(RunnableFuture<OperatorStateHandle> operatorStateRawFuture) { + this.operatorStateRawFuture = operatorStateRawFuture; + } + + public void cancel() throws Exception { + Exception exception = null; + + try { + StateUtil.discardStateFuture(getKeyedStateManagedFuture()); + } catch (Exception e) { + exception = new Exception("Could not properly cancel managed keyed state future.", e); + } + + try { + StateUtil.discardStateFuture(getOperatorStateManagedFuture()); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed( + new Exception("Could not properly cancel managed operator state future.", e), + exception); + } + + try { + StateUtil.discardStateFuture(getKeyedStateRawFuture()); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed( + new Exception("Could not properly cancel raw keyed state future.", e), + exception); + } + + try { + StateUtil.discardStateFuture(getOperatorStateRawFuture()); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed( + new Exception("Could not properly cancel raw operator state future.", e), + exception); + } + + if (exception != null) { + throw exception; + } + } + + public boolean hasKeyedState() { + return keyedStateManagedFuture != null || keyedStateRawFuture != null; + } + + public boolean hasOperatorState() { + return operatorStateManagedFuture != null || operatorStateRawFuture != null; + } + + public boolean hasState() { + return hasKeyedState() || hasOperatorState(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ea0d16d4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java deleted file mode 100644 index 8c05ae9..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.operators; - -import org.apache.flink.runtime.state.KeyedStateHandle; -import org.apache.flink.runtime.state.OperatorStateHandle; -import org.apache.flink.runtime.state.StateUtil; -import org.apache.flink.util.ExceptionUtils; - -import java.util.concurrent.RunnableFuture; - -/** - * Result of {@link StreamOperator#snapshotState}. - */ -public class OperatorSnapshotResult { - - private RunnableFuture<KeyedStateHandle> keyedStateManagedFuture; - private RunnableFuture<KeyedStateHandle> keyedStateRawFuture; - private RunnableFuture<OperatorStateHandle> operatorStateManagedFuture; - private RunnableFuture<OperatorStateHandle> operatorStateRawFuture; - - public OperatorSnapshotResult() { - this(null, null, null, null); - } - - public OperatorSnapshotResult( - RunnableFuture<KeyedStateHandle> keyedStateManagedFuture, - RunnableFuture<KeyedStateHandle> keyedStateRawFuture, - RunnableFuture<OperatorStateHandle> operatorStateManagedFuture, - RunnableFuture<OperatorStateHandle> operatorStateRawFuture) { - this.keyedStateManagedFuture = keyedStateManagedFuture; - this.keyedStateRawFuture = keyedStateRawFuture; - this.operatorStateManagedFuture = operatorStateManagedFuture; - this.operatorStateRawFuture = operatorStateRawFuture; - } - - public RunnableFuture<KeyedStateHandle> getKeyedStateManagedFuture() { - return keyedStateManagedFuture; - } - - public void setKeyedStateManagedFuture(RunnableFuture<KeyedStateHandle> keyedStateManagedFuture) { - this.keyedStateManagedFuture = keyedStateManagedFuture; - } - - public RunnableFuture<KeyedStateHandle> getKeyedStateRawFuture() { - return keyedStateRawFuture; - } - - public void setKeyedStateRawFuture(RunnableFuture<KeyedStateHandle> keyedStateRawFuture) { - this.keyedStateRawFuture = keyedStateRawFuture; - } - - public RunnableFuture<OperatorStateHandle> getOperatorStateManagedFuture() { - return operatorStateManagedFuture; - } - - public void setOperatorStateManagedFuture(RunnableFuture<OperatorStateHandle> operatorStateManagedFuture) { - this.operatorStateManagedFuture = operatorStateManagedFuture; - } - - public RunnableFuture<OperatorStateHandle> getOperatorStateRawFuture() { - return operatorStateRawFuture; - } - - public void setOperatorStateRawFuture(RunnableFuture<OperatorStateHandle> operatorStateRawFuture) { - this.operatorStateRawFuture = operatorStateRawFuture; - } - - public void cancel() throws Exception { - Exception exception = null; - - try { - StateUtil.discardStateFuture(getKeyedStateManagedFuture()); - } catch (Exception e) { - exception = new Exception("Could not properly cancel managed keyed state future.", e); - } - - try { - StateUtil.discardStateFuture(getOperatorStateManagedFuture()); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed( - new Exception("Could not properly cancel managed operator state future.", e), - exception); - } - - try { - StateUtil.discardStateFuture(getKeyedStateRawFuture()); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed( - new Exception("Could not properly cancel raw keyed state future.", e), - exception); - } - - try { - StateUtil.discardStateFuture(getOperatorStateRawFuture()); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed( - new Exception("Could not properly cancel raw operator state future.", e), - exception); - } - - if (exception != null) { - throw exception; - } - } - - public boolean hasKeyedState() { - return keyedStateManagedFuture != null || keyedStateRawFuture != null; - } - - public boolean hasOperatorState() { - return operatorStateManagedFuture != null || operatorStateRawFuture != null; - } - - public boolean hasState() { - return hasKeyedState() || hasOperatorState(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ea0d16d4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java index 8450396..c3254f6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java @@ -99,7 +99,7 @@ public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Ser * * @throws Exception exception that happened during snapshotting. */ - OperatorSnapshotResult snapshotState( + OperatorSnapshotFutures snapshotState( long checkpointId, long timestamp, CheckpointOptions checkpointOptions, http://git-wip-us.apache.org/repos/asf/flink/blob/ea0d16d4/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 03c23a5..06cb18b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -46,7 +46,7 @@ import org.apache.flink.runtime.util.OperatorSubtaskDescriptionText; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamEdge; -import org.apache.flink.streaming.api.operators.OperatorSnapshotResult; +import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; @@ -804,7 +804,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> private final StreamTask<?, ?> owner; - private final Map<OperatorID, OperatorSnapshotResult> operatorSnapshotsInProgress; + private final Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress; private final CheckpointMetaData checkpointMetaData; private final CheckpointMetrics checkpointMetrics; @@ -816,7 +816,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> AsyncCheckpointRunnable( StreamTask<?, ?> owner, - Map<OperatorID, OperatorSnapshotResult> operatorSnapshotsInProgress, + Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress, CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics, long asyncStartNanos) { @@ -838,10 +838,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> final TaskStateSnapshot taskOperatorSubtaskStates = new TaskStateSnapshot(operatorSnapshotsInProgress.size()); - for (Map.Entry<OperatorID, OperatorSnapshotResult> entry : operatorSnapshotsInProgress.entrySet()) { + for (Map.Entry<OperatorID, OperatorSnapshotFutures> entry : operatorSnapshotsInProgress.entrySet()) { OperatorID operatorID = entry.getKey(); - OperatorSnapshotResult snapshotInProgress = entry.getValue(); + OperatorSnapshotFutures snapshotInProgress = entry.getValue(); OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState( FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateManagedFuture()), @@ -927,7 +927,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> Exception exception = null; // clean up ongoing operator snapshot results and non partitioned state handles - for (OperatorSnapshotResult operatorSnapshotResult : operatorSnapshotsInProgress.values()) { + for (OperatorSnapshotFutures operatorSnapshotResult : operatorSnapshotsInProgress.values()) { if (operatorSnapshotResult != null) { try { operatorSnapshotResult.cancel(); @@ -971,7 +971,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> // ------------------------ - private final Map<OperatorID, OperatorSnapshotResult> operatorSnapshotsInProgress; + private final Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress; public CheckpointingOperation( StreamTask<?, ?> owner, @@ -1026,7 +1026,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> } } catch (Exception ex) { // Cleanup to release resources - for (OperatorSnapshotResult operatorSnapshotResult : operatorSnapshotsInProgress.values()) { + for (OperatorSnapshotFutures operatorSnapshotResult : operatorSnapshotsInProgress.values()) { if (null != operatorSnapshotResult) { try { operatorSnapshotResult.cancel(); @@ -1052,7 +1052,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> private void checkpointStreamOperator(StreamOperator<?> op) throws Exception { if (null != op) { - OperatorSnapshotResult snapshotInProgress = op.snapshotState( + OperatorSnapshotFutures snapshotInProgress = op.snapshotState( checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), checkpointOptions, http://git-wip-us.apache.org/repos/asf/flink/blob/ea0d16d4/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java index 46cae27..85069b5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java @@ -576,10 +576,10 @@ public class AbstractStreamOperatorTest { when(context.getKeyedStateStreamFuture()).thenReturn(futureKeyedStateHandle); when(context.getOperatorStateStreamFuture()).thenReturn(futureOperatorStateHandle); - OperatorSnapshotResult operatorSnapshotResult = spy(new OperatorSnapshotResult()); + OperatorSnapshotFutures operatorSnapshotResult = spy(new OperatorSnapshotFutures()); whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn(context); - whenNew(OperatorSnapshotResult.class).withAnyArguments().thenReturn(operatorSnapshotResult); + whenNew(OperatorSnapshotFutures.class).withAnyArguments().thenReturn(operatorSnapshotResult); CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class); StreamTask<Void, AbstractStreamOperator<Void>> containingTask = mock(StreamTask.class); http://git-wip-us.apache.org/repos/asf/flink/blob/ea0d16d4/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFuturesTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFuturesTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFuturesTest.java new file mode 100644 index 0000000..6da39af --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFuturesTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.concurrent.RunnableFuture; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Tests for {@link OperatorSnapshotFutures}. + */ +public class OperatorSnapshotFuturesTest extends TestLogger { + + /** + * Tests that all runnable futures in an OperatorSnapshotResult are properly cancelled and if + * the StreamStateHandle result is retrievable that the state handle are discarded. + */ + @Test + public void testCancelAndCleanup() throws Exception { + OperatorSnapshotFutures operatorSnapshotResult = new OperatorSnapshotFutures(); + + operatorSnapshotResult.cancel(); + + KeyedStateHandle keyedManagedStateHandle = mock(KeyedStateHandle.class); + RunnableFuture<KeyedStateHandle> keyedStateManagedFuture = mock(RunnableFuture.class); + when(keyedStateManagedFuture.get()).thenReturn(keyedManagedStateHandle); + + KeyedStateHandle keyedRawStateHandle = mock(KeyedStateHandle.class); + RunnableFuture<KeyedStateHandle> keyedStateRawFuture = mock(RunnableFuture.class); + when(keyedStateRawFuture.get()).thenReturn(keyedRawStateHandle); + + OperatorStateHandle operatorManagedStateHandle = mock(OperatorStateHandle.class); + RunnableFuture<OperatorStateHandle> operatorStateManagedFuture = mock(RunnableFuture.class); + when(operatorStateManagedFuture.get()).thenReturn(operatorManagedStateHandle); + + OperatorStateHandle operatorRawStateHandle = mock(OperatorStateHandle.class); + RunnableFuture<OperatorStateHandle> operatorStateRawFuture = mock(RunnableFuture.class); + when(operatorStateRawFuture.get()).thenReturn(operatorRawStateHandle); + + operatorSnapshotResult = new OperatorSnapshotFutures( + keyedStateManagedFuture, + keyedStateRawFuture, + operatorStateManagedFuture, + operatorStateRawFuture); + + operatorSnapshotResult.cancel(); + + verify(keyedStateManagedFuture).cancel(true); + verify(keyedStateRawFuture).cancel(true); + verify(operatorStateManagedFuture).cancel(true); + verify(operatorStateRawFuture).cancel(true); + + verify(keyedManagedStateHandle).discardState(); + verify(keyedRawStateHandle).discardState(); + verify(operatorManagedStateHandle).discardState(); + verify(operatorRawStateHandle).discardState(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ea0d16d4/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java deleted file mode 100644 index 5a7e69e..0000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.operators; - -import org.apache.flink.runtime.state.KeyedStateHandle; -import org.apache.flink.runtime.state.OperatorStateHandle; -import org.apache.flink.util.TestLogger; - -import org.junit.Test; - -import java.util.concurrent.RunnableFuture; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.powermock.api.mockito.PowerMockito.when; - -/** - * Tests for {@link OperatorSnapshotResult}. - */ -public class OperatorSnapshotResultTest extends TestLogger { - - /** - * Tests that all runnable futures in an OperatorSnapshotResult are properly cancelled and if - * the StreamStateHandle result is retrievable that the state handle are discarded. - */ - @Test - public void testCancelAndCleanup() throws Exception { - OperatorSnapshotResult operatorSnapshotResult = new OperatorSnapshotResult(); - - operatorSnapshotResult.cancel(); - - KeyedStateHandle keyedManagedStateHandle = mock(KeyedStateHandle.class); - RunnableFuture<KeyedStateHandle> keyedStateManagedFuture = mock(RunnableFuture.class); - when(keyedStateManagedFuture.get()).thenReturn(keyedManagedStateHandle); - - KeyedStateHandle keyedRawStateHandle = mock(KeyedStateHandle.class); - RunnableFuture<KeyedStateHandle> keyedStateRawFuture = mock(RunnableFuture.class); - when(keyedStateRawFuture.get()).thenReturn(keyedRawStateHandle); - - OperatorStateHandle operatorManagedStateHandle = mock(OperatorStateHandle.class); - RunnableFuture<OperatorStateHandle> operatorStateManagedFuture = mock(RunnableFuture.class); - when(operatorStateManagedFuture.get()).thenReturn(operatorManagedStateHandle); - - OperatorStateHandle operatorRawStateHandle = mock(OperatorStateHandle.class); - RunnableFuture<OperatorStateHandle> operatorStateRawFuture = mock(RunnableFuture.class); - when(operatorStateRawFuture.get()).thenReturn(operatorRawStateHandle); - - operatorSnapshotResult = new OperatorSnapshotResult( - keyedStateManagedFuture, - keyedStateRawFuture, - operatorStateManagedFuture, - operatorStateRawFuture); - - operatorSnapshotResult.cancel(); - - verify(keyedStateManagedFuture).cancel(true); - verify(keyedStateRawFuture).cancel(true); - verify(operatorStateManagedFuture).cancel(true); - verify(operatorStateRawFuture).cancel(true); - - verify(keyedManagedStateHandle).discardState(); - verify(keyedRawStateHandle).discardState(); - verify(operatorManagedStateHandle).discardState(); - verify(operatorRawStateHandle).discardState(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ea0d16d4/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 52295fb..99d4e5b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -94,7 +94,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; -import org.apache.flink.streaming.api.operators.OperatorSnapshotResult; +import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamOperatorStateContext; @@ -341,8 +341,8 @@ public class StreamTaskTest extends TestLogger { StreamOperator<?> streamOperator3 = mock(StreamOperator.class); // mock the returned snapshots - OperatorSnapshotResult operatorSnapshotResult1 = mock(OperatorSnapshotResult.class); - OperatorSnapshotResult operatorSnapshotResult2 = mock(OperatorSnapshotResult.class); + OperatorSnapshotFutures operatorSnapshotResult1 = mock(OperatorSnapshotFutures.class); + OperatorSnapshotFutures operatorSnapshotResult2 = mock(OperatorSnapshotFutures.class); final Exception testException = new Exception("Test exception"); @@ -410,9 +410,9 @@ public class StreamTaskTest extends TestLogger { StreamOperator<?> streamOperator3 = mock(StreamOperator.class); // mock the new state operator snapshots - OperatorSnapshotResult operatorSnapshotResult1 = mock(OperatorSnapshotResult.class); - OperatorSnapshotResult operatorSnapshotResult2 = mock(OperatorSnapshotResult.class); - OperatorSnapshotResult operatorSnapshotResult3 = mock(OperatorSnapshotResult.class); + OperatorSnapshotFutures operatorSnapshotResult1 = mock(OperatorSnapshotFutures.class); + OperatorSnapshotFutures operatorSnapshotResult2 = mock(OperatorSnapshotFutures.class); + OperatorSnapshotFutures operatorSnapshotResult3 = mock(OperatorSnapshotFutures.class); RunnableFuture<OperatorStateHandle> failingFuture = mock(RunnableFuture.class); when(failingFuture.get()).thenThrow(new ExecutionException(new Exception("Test exception"))); @@ -520,7 +520,7 @@ public class StreamTaskTest extends TestLogger { OperatorStateHandle managedOperatorStateHandle = mock(OperatorStateHandle.class); OperatorStateHandle rawOperatorStateHandle = mock(OperatorStateHandle.class); - OperatorSnapshotResult operatorSnapshotResult = new OperatorSnapshotResult( + OperatorSnapshotFutures operatorSnapshotResult = new OperatorSnapshotFutures( new DoneFuture<>(managedKeyedStateHandle), new DoneFuture<>(rawKeyedStateHandle), new DoneFuture<>(managedOperatorStateHandle), @@ -635,7 +635,7 @@ public class StreamTaskTest extends TestLogger { OperatorStateHandle managedOperatorStateHandle = mock(OperatorStateHandle.class); OperatorStateHandle rawOperatorStateHandle = mock(OperatorStateHandle.class); - OperatorSnapshotResult operatorSnapshotResult = new OperatorSnapshotResult( + OperatorSnapshotFutures operatorSnapshotResult = new OperatorSnapshotFutures( new DoneFuture<>(managedKeyedStateHandle), new DoneFuture<>(rawKeyedStateHandle), new DoneFuture<>(managedOperatorStateHandle), @@ -739,7 +739,7 @@ public class StreamTaskTest extends TestLogger { when(statelessOperator.getOperatorID()).thenReturn(operatorID); // mock the returned empty snapshot result (all state handles are null) - OperatorSnapshotResult statelessOperatorSnapshotResult = new OperatorSnapshotResult(); + OperatorSnapshotFutures statelessOperatorSnapshotResult = new OperatorSnapshotFutures(); when(statelessOperator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class), any(CheckpointStreamFactory.class))) .thenReturn(statelessOperatorSnapshotResult); http://git-wip-us.apache.org/repos/asf/flink/blob/ea0d16d4/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index d38cb28..28ad930 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -47,7 +47,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.AbstractStreamOperatorTest; -import org.apache.flink.streaming.api.operators.OperatorSnapshotResult; +import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; @@ -470,7 +470,7 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable { */ public OperatorSubtaskState snapshot(long checkpointId, long timestamp) throws Exception { - OperatorSnapshotResult operatorStateResult = operator.snapshotState( + OperatorSnapshotFutures operatorStateResult = operator.snapshotState( checkpointId, timestamp, CheckpointOptions.forCheckpointWithDefaultLocation(),
