Repository: flink
Updated Branches:
  refs/heads/master 4e7f03e41 -> f9a583b72


[hotfix] Rename OperatorSnapshotResult to OperatorSnapshotFutures.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ea0d16d4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ea0d16d4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ea0d16d4

Branch: refs/heads/master
Commit: ea0d16d4bac639dc4858a6c5cef209e904e655ef
Parents: 617e67c
Author: Stefan Richter <[email protected]>
Authored: Thu Feb 22 16:14:39 2018 +0100
Committer: Stefan Richter <[email protected]>
Committed: Sun Feb 25 15:10:28 2018 +0100

----------------------------------------------------------------------
 .../api/operators/AbstractStreamOperator.java   |   4 +-
 .../api/operators/OperatorSnapshotFutures.java  | 134 +++++++++++++++++++
 .../api/operators/OperatorSnapshotResult.java   | 134 -------------------
 .../streaming/api/operators/StreamOperator.java |   2 +-
 .../streaming/runtime/tasks/StreamTask.java     |  18 +--
 .../operators/AbstractStreamOperatorTest.java   |   4 +-
 .../operators/OperatorSnapshotFuturesTest.java  |  82 ++++++++++++
 .../operators/OperatorSnapshotResultTest.java   |  82 ------------
 .../streaming/runtime/tasks/StreamTaskTest.java |  18 +--
 .../util/AbstractStreamOperatorTestHarness.java |   4 +-
 10 files changed, 241 insertions(+), 241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ea0d16d4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 42b6923..4f16259 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -347,7 +347,7 @@ public abstract class AbstractStreamOperator<OUT>
        }
 
        @Override
-       public final OperatorSnapshotResult snapshotState(
+       public final OperatorSnapshotFutures snapshotState(
                        long checkpointId,
                        long timestamp,
                        CheckpointOptions checkpointOptions,
@@ -356,7 +356,7 @@ public abstract class AbstractStreamOperator<OUT>
                KeyGroupRange keyGroupRange = null != keyedStateBackend ?
                                keyedStateBackend.getKeyGroupRange() : 
KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
 
-               OperatorSnapshotResult snapshotInProgress = new 
OperatorSnapshotResult();
+               OperatorSnapshotFutures snapshotInProgress = new 
OperatorSnapshotFutures();
 
                try (StateSnapshotContextSynchronousImpl snapshotContext = new 
StateSnapshotContextSynchronousImpl(
                                checkpointId,

http://git-wip-us.apache.org/repos/asf/flink/blob/ea0d16d4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFutures.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFutures.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFutures.java
new file mode 100644
index 0000000..bdaf64b
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFutures.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.util.ExceptionUtils;
+
+import java.util.concurrent.RunnableFuture;
+
+/**
+ * Result of {@link StreamOperator#snapshotState}.
+ */
+public class OperatorSnapshotFutures {
+
+       private RunnableFuture<KeyedStateHandle> keyedStateManagedFuture;
+       private RunnableFuture<KeyedStateHandle> keyedStateRawFuture;
+       private RunnableFuture<OperatorStateHandle> operatorStateManagedFuture;
+       private RunnableFuture<OperatorStateHandle> operatorStateRawFuture;
+
+       public OperatorSnapshotFutures() {
+               this(null, null, null, null);
+       }
+
+       public OperatorSnapshotFutures(
+                       RunnableFuture<KeyedStateHandle> 
keyedStateManagedFuture,
+                       RunnableFuture<KeyedStateHandle> keyedStateRawFuture,
+                       RunnableFuture<OperatorStateHandle> 
operatorStateManagedFuture,
+                       RunnableFuture<OperatorStateHandle> 
operatorStateRawFuture) {
+               this.keyedStateManagedFuture = keyedStateManagedFuture;
+               this.keyedStateRawFuture = keyedStateRawFuture;
+               this.operatorStateManagedFuture = operatorStateManagedFuture;
+               this.operatorStateRawFuture = operatorStateRawFuture;
+       }
+
+       public RunnableFuture<KeyedStateHandle> getKeyedStateManagedFuture() {
+               return keyedStateManagedFuture;
+       }
+
+       public void setKeyedStateManagedFuture(RunnableFuture<KeyedStateHandle> 
keyedStateManagedFuture) {
+               this.keyedStateManagedFuture = keyedStateManagedFuture;
+       }
+
+       public RunnableFuture<KeyedStateHandle> getKeyedStateRawFuture() {
+               return keyedStateRawFuture;
+       }
+
+       public void setKeyedStateRawFuture(RunnableFuture<KeyedStateHandle> 
keyedStateRawFuture) {
+               this.keyedStateRawFuture = keyedStateRawFuture;
+       }
+
+       public RunnableFuture<OperatorStateHandle> 
getOperatorStateManagedFuture() {
+               return operatorStateManagedFuture;
+       }
+
+       public void 
setOperatorStateManagedFuture(RunnableFuture<OperatorStateHandle> 
operatorStateManagedFuture) {
+               this.operatorStateManagedFuture = operatorStateManagedFuture;
+       }
+
+       public RunnableFuture<OperatorStateHandle> getOperatorStateRawFuture() {
+               return operatorStateRawFuture;
+       }
+
+       public void 
setOperatorStateRawFuture(RunnableFuture<OperatorStateHandle> 
operatorStateRawFuture) {
+               this.operatorStateRawFuture = operatorStateRawFuture;
+       }
+
+       public void cancel() throws Exception {
+               Exception exception = null;
+
+               try {
+                       
StateUtil.discardStateFuture(getKeyedStateManagedFuture());
+               } catch (Exception e) {
+                       exception = new Exception("Could not properly cancel 
managed keyed state future.", e);
+               }
+
+               try {
+                       
StateUtil.discardStateFuture(getOperatorStateManagedFuture());
+               } catch (Exception e) {
+                       exception = ExceptionUtils.firstOrSuppressed(
+                               new Exception("Could not properly cancel 
managed operator state future.", e),
+                               exception);
+               }
+
+               try {
+                       StateUtil.discardStateFuture(getKeyedStateRawFuture());
+               } catch (Exception e) {
+                       exception = ExceptionUtils.firstOrSuppressed(
+                               new Exception("Could not properly cancel raw 
keyed state future.", e),
+                               exception);
+               }
+
+               try {
+                       
StateUtil.discardStateFuture(getOperatorStateRawFuture());
+               } catch (Exception e) {
+                       exception = ExceptionUtils.firstOrSuppressed(
+                               new Exception("Could not properly cancel raw 
operator state future.", e),
+                               exception);
+               }
+
+               if (exception != null) {
+                       throw exception;
+               }
+       }
+
+       public boolean hasKeyedState() {
+               return keyedStateManagedFuture != null || keyedStateRawFuture 
!= null;
+       }
+
+       public boolean hasOperatorState() {
+               return operatorStateManagedFuture != null || 
operatorStateRawFuture != null;
+       }
+
+       public boolean hasState() {
+               return hasKeyedState() || hasOperatorState();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea0d16d4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
deleted file mode 100644
index 8c05ae9..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.StateUtil;
-import org.apache.flink.util.ExceptionUtils;
-
-import java.util.concurrent.RunnableFuture;
-
-/**
- * Result of {@link StreamOperator#snapshotState}.
- */
-public class OperatorSnapshotResult {
-
-       private RunnableFuture<KeyedStateHandle> keyedStateManagedFuture;
-       private RunnableFuture<KeyedStateHandle> keyedStateRawFuture;
-       private RunnableFuture<OperatorStateHandle> operatorStateManagedFuture;
-       private RunnableFuture<OperatorStateHandle> operatorStateRawFuture;
-
-       public OperatorSnapshotResult() {
-               this(null, null, null, null);
-       }
-
-       public OperatorSnapshotResult(
-                       RunnableFuture<KeyedStateHandle> 
keyedStateManagedFuture,
-                       RunnableFuture<KeyedStateHandle> keyedStateRawFuture,
-                       RunnableFuture<OperatorStateHandle> 
operatorStateManagedFuture,
-                       RunnableFuture<OperatorStateHandle> 
operatorStateRawFuture) {
-               this.keyedStateManagedFuture = keyedStateManagedFuture;
-               this.keyedStateRawFuture = keyedStateRawFuture;
-               this.operatorStateManagedFuture = operatorStateManagedFuture;
-               this.operatorStateRawFuture = operatorStateRawFuture;
-       }
-
-       public RunnableFuture<KeyedStateHandle> getKeyedStateManagedFuture() {
-               return keyedStateManagedFuture;
-       }
-
-       public void setKeyedStateManagedFuture(RunnableFuture<KeyedStateHandle> 
keyedStateManagedFuture) {
-               this.keyedStateManagedFuture = keyedStateManagedFuture;
-       }
-
-       public RunnableFuture<KeyedStateHandle> getKeyedStateRawFuture() {
-               return keyedStateRawFuture;
-       }
-
-       public void setKeyedStateRawFuture(RunnableFuture<KeyedStateHandle> 
keyedStateRawFuture) {
-               this.keyedStateRawFuture = keyedStateRawFuture;
-       }
-
-       public RunnableFuture<OperatorStateHandle> 
getOperatorStateManagedFuture() {
-               return operatorStateManagedFuture;
-       }
-
-       public void 
setOperatorStateManagedFuture(RunnableFuture<OperatorStateHandle> 
operatorStateManagedFuture) {
-               this.operatorStateManagedFuture = operatorStateManagedFuture;
-       }
-
-       public RunnableFuture<OperatorStateHandle> getOperatorStateRawFuture() {
-               return operatorStateRawFuture;
-       }
-
-       public void 
setOperatorStateRawFuture(RunnableFuture<OperatorStateHandle> 
operatorStateRawFuture) {
-               this.operatorStateRawFuture = operatorStateRawFuture;
-       }
-
-       public void cancel() throws Exception {
-               Exception exception = null;
-
-               try {
-                       
StateUtil.discardStateFuture(getKeyedStateManagedFuture());
-               } catch (Exception e) {
-                       exception = new Exception("Could not properly cancel 
managed keyed state future.", e);
-               }
-
-               try {
-                       
StateUtil.discardStateFuture(getOperatorStateManagedFuture());
-               } catch (Exception e) {
-                       exception = ExceptionUtils.firstOrSuppressed(
-                               new Exception("Could not properly cancel 
managed operator state future.", e),
-                               exception);
-               }
-
-               try {
-                       StateUtil.discardStateFuture(getKeyedStateRawFuture());
-               } catch (Exception e) {
-                       exception = ExceptionUtils.firstOrSuppressed(
-                               new Exception("Could not properly cancel raw 
keyed state future.", e),
-                               exception);
-               }
-
-               try {
-                       
StateUtil.discardStateFuture(getOperatorStateRawFuture());
-               } catch (Exception e) {
-                       exception = ExceptionUtils.firstOrSuppressed(
-                               new Exception("Could not properly cancel raw 
operator state future.", e),
-                               exception);
-               }
-
-               if (exception != null) {
-                       throw exception;
-               }
-       }
-
-       public boolean hasKeyedState() {
-               return keyedStateManagedFuture != null || keyedStateRawFuture 
!= null;
-       }
-
-       public boolean hasOperatorState() {
-               return operatorStateManagedFuture != null || 
operatorStateRawFuture != null;
-       }
-
-       public boolean hasState() {
-               return hasKeyedState() || hasOperatorState();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea0d16d4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index 8450396..c3254f6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -99,7 +99,7 @@ public interface StreamOperator<OUT> extends 
CheckpointListener, KeyContext, Ser
         *
         * @throws Exception exception that happened during snapshotting.
         */
-       OperatorSnapshotResult snapshotState(
+       OperatorSnapshotFutures snapshotState(
                long checkpointId,
                long timestamp,
                CheckpointOptions checkpointOptions,

http://git-wip-us.apache.org/repos/asf/flink/blob/ea0d16d4/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 03c23a5..06cb18b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -46,7 +46,7 @@ import 
org.apache.flink.runtime.util.OperatorSubtaskDescriptionText;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
@@ -804,7 +804,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
                private final StreamTask<?, ?> owner;
 
-               private final Map<OperatorID, OperatorSnapshotResult> 
operatorSnapshotsInProgress;
+               private final Map<OperatorID, OperatorSnapshotFutures> 
operatorSnapshotsInProgress;
 
                private final CheckpointMetaData checkpointMetaData;
                private final CheckpointMetrics checkpointMetrics;
@@ -816,7 +816,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
                AsyncCheckpointRunnable(
                        StreamTask<?, ?> owner,
-                       Map<OperatorID, OperatorSnapshotResult> 
operatorSnapshotsInProgress,
+                       Map<OperatorID, OperatorSnapshotFutures> 
operatorSnapshotsInProgress,
                        CheckpointMetaData checkpointMetaData,
                        CheckpointMetrics checkpointMetrics,
                        long asyncStartNanos) {
@@ -838,10 +838,10 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                final TaskStateSnapshot 
taskOperatorSubtaskStates =
                                        new 
TaskStateSnapshot(operatorSnapshotsInProgress.size());
 
-                               for (Map.Entry<OperatorID, 
OperatorSnapshotResult> entry : operatorSnapshotsInProgress.entrySet()) {
+                               for (Map.Entry<OperatorID, 
OperatorSnapshotFutures> entry : operatorSnapshotsInProgress.entrySet()) {
 
                                        OperatorID operatorID = entry.getKey();
-                                       OperatorSnapshotResult 
snapshotInProgress = entry.getValue();
+                                       OperatorSnapshotFutures 
snapshotInProgress = entry.getValue();
 
                                        OperatorSubtaskState 
operatorSubtaskState = new OperatorSubtaskState(
                                                
FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateManagedFuture()),
@@ -927,7 +927,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                Exception exception = null;
 
                                // clean up ongoing operator snapshot results 
and non partitioned state handles
-                               for (OperatorSnapshotResult 
operatorSnapshotResult : operatorSnapshotsInProgress.values()) {
+                               for (OperatorSnapshotFutures 
operatorSnapshotResult : operatorSnapshotsInProgress.values()) {
                                        if (operatorSnapshotResult != null) {
                                                try {
                                                        
operatorSnapshotResult.cancel();
@@ -971,7 +971,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
                // ------------------------
 
-               private final Map<OperatorID, OperatorSnapshotResult> 
operatorSnapshotsInProgress;
+               private final Map<OperatorID, OperatorSnapshotFutures> 
operatorSnapshotsInProgress;
 
                public CheckpointingOperation(
                                StreamTask<?, ?> owner,
@@ -1026,7 +1026,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                }
                        } catch (Exception ex) {
                                // Cleanup to release resources
-                               for (OperatorSnapshotResult 
operatorSnapshotResult : operatorSnapshotsInProgress.values()) {
+                               for (OperatorSnapshotFutures 
operatorSnapshotResult : operatorSnapshotsInProgress.values()) {
                                        if (null != operatorSnapshotResult) {
                                                try {
                                                        
operatorSnapshotResult.cancel();
@@ -1052,7 +1052,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                private void checkpointStreamOperator(StreamOperator<?> op) 
throws Exception {
                        if (null != op) {
 
-                               OperatorSnapshotResult snapshotInProgress = 
op.snapshotState(
+                               OperatorSnapshotFutures snapshotInProgress = 
op.snapshotState(
                                                
checkpointMetaData.getCheckpointId(),
                                                
checkpointMetaData.getTimestamp(),
                                                checkpointOptions,

http://git-wip-us.apache.org/repos/asf/flink/blob/ea0d16d4/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index 46cae27..85069b5 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -576,10 +576,10 @@ public class AbstractStreamOperatorTest {
                
when(context.getKeyedStateStreamFuture()).thenReturn(futureKeyedStateHandle);
                
when(context.getOperatorStateStreamFuture()).thenReturn(futureOperatorStateHandle);
 
-               OperatorSnapshotResult operatorSnapshotResult = spy(new 
OperatorSnapshotResult());
+               OperatorSnapshotFutures operatorSnapshotResult = spy(new 
OperatorSnapshotFutures());
 
                
whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn(context);
-               
whenNew(OperatorSnapshotResult.class).withAnyArguments().thenReturn(operatorSnapshotResult);
+               
whenNew(OperatorSnapshotFutures.class).withAnyArguments().thenReturn(operatorSnapshotResult);
 
                CheckpointStreamFactory streamFactory = 
mock(CheckpointStreamFactory.class);
                StreamTask<Void, AbstractStreamOperator<Void>> containingTask = 
mock(StreamTask.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/ea0d16d4/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFuturesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFuturesTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFuturesTest.java
new file mode 100644
index 0000000..6da39af
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFuturesTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.concurrent.RunnableFuture;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Tests for {@link OperatorSnapshotFutures}.
+ */
+public class OperatorSnapshotFuturesTest extends TestLogger {
+
+       /**
+        * Tests that all runnable futures in an OperatorSnapshotResult are 
properly cancelled and if
+        * the StreamStateHandle result is retrievable that the state handle 
are discarded.
+        */
+       @Test
+       public void testCancelAndCleanup() throws Exception {
+               OperatorSnapshotFutures operatorSnapshotResult = new 
OperatorSnapshotFutures();
+
+               operatorSnapshotResult.cancel();
+
+               KeyedStateHandle keyedManagedStateHandle = 
mock(KeyedStateHandle.class);
+               RunnableFuture<KeyedStateHandle> keyedStateManagedFuture = 
mock(RunnableFuture.class);
+               
when(keyedStateManagedFuture.get()).thenReturn(keyedManagedStateHandle);
+
+               KeyedStateHandle keyedRawStateHandle = 
mock(KeyedStateHandle.class);
+               RunnableFuture<KeyedStateHandle> keyedStateRawFuture = 
mock(RunnableFuture.class);
+               when(keyedStateRawFuture.get()).thenReturn(keyedRawStateHandle);
+
+               OperatorStateHandle operatorManagedStateHandle = 
mock(OperatorStateHandle.class);
+               RunnableFuture<OperatorStateHandle> operatorStateManagedFuture 
= mock(RunnableFuture.class);
+               
when(operatorStateManagedFuture.get()).thenReturn(operatorManagedStateHandle);
+
+               OperatorStateHandle operatorRawStateHandle = 
mock(OperatorStateHandle.class);
+               RunnableFuture<OperatorStateHandle> operatorStateRawFuture = 
mock(RunnableFuture.class);
+               
when(operatorStateRawFuture.get()).thenReturn(operatorRawStateHandle);
+
+               operatorSnapshotResult = new OperatorSnapshotFutures(
+                       keyedStateManagedFuture,
+                       keyedStateRawFuture,
+                       operatorStateManagedFuture,
+                       operatorStateRawFuture);
+
+               operatorSnapshotResult.cancel();
+
+               verify(keyedStateManagedFuture).cancel(true);
+               verify(keyedStateRawFuture).cancel(true);
+               verify(operatorStateManagedFuture).cancel(true);
+               verify(operatorStateRawFuture).cancel(true);
+
+               verify(keyedManagedStateHandle).discardState();
+               verify(keyedRawStateHandle).discardState();
+               verify(operatorManagedStateHandle).discardState();
+               verify(operatorRawStateHandle).discardState();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea0d16d4/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java
deleted file mode 100644
index 5a7e69e..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import java.util.concurrent.RunnableFuture;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.powermock.api.mockito.PowerMockito.when;
-
-/**
- * Tests for {@link OperatorSnapshotResult}.
- */
-public class OperatorSnapshotResultTest extends TestLogger {
-
-       /**
-        * Tests that all runnable futures in an OperatorSnapshotResult are 
properly cancelled and if
-        * the StreamStateHandle result is retrievable that the state handle 
are discarded.
-        */
-       @Test
-       public void testCancelAndCleanup() throws Exception {
-               OperatorSnapshotResult operatorSnapshotResult = new 
OperatorSnapshotResult();
-
-               operatorSnapshotResult.cancel();
-
-               KeyedStateHandle keyedManagedStateHandle = 
mock(KeyedStateHandle.class);
-               RunnableFuture<KeyedStateHandle> keyedStateManagedFuture = 
mock(RunnableFuture.class);
-               
when(keyedStateManagedFuture.get()).thenReturn(keyedManagedStateHandle);
-
-               KeyedStateHandle keyedRawStateHandle = 
mock(KeyedStateHandle.class);
-               RunnableFuture<KeyedStateHandle> keyedStateRawFuture = 
mock(RunnableFuture.class);
-               when(keyedStateRawFuture.get()).thenReturn(keyedRawStateHandle);
-
-               OperatorStateHandle operatorManagedStateHandle = 
mock(OperatorStateHandle.class);
-               RunnableFuture<OperatorStateHandle> operatorStateManagedFuture 
= mock(RunnableFuture.class);
-               
when(operatorStateManagedFuture.get()).thenReturn(operatorManagedStateHandle);
-
-               OperatorStateHandle operatorRawStateHandle = 
mock(OperatorStateHandle.class);
-               RunnableFuture<OperatorStateHandle> operatorStateRawFuture = 
mock(RunnableFuture.class);
-               
when(operatorStateRawFuture.get()).thenReturn(operatorRawStateHandle);
-
-               operatorSnapshotResult = new OperatorSnapshotResult(
-                       keyedStateManagedFuture,
-                       keyedStateRawFuture,
-                       operatorStateManagedFuture,
-                       operatorStateRawFuture);
-
-               operatorSnapshotResult.cancel();
-
-               verify(keyedStateManagedFuture).cancel(true);
-               verify(keyedStateRawFuture).cancel(true);
-               verify(operatorStateManagedFuture).cancel(true);
-               verify(operatorStateRawFuture).cancel(true);
-
-               verify(keyedManagedStateHandle).discardState();
-               verify(keyedRawStateHandle).discardState();
-               verify(operatorManagedStateHandle).discardState();
-               verify(operatorRawStateHandle).discardState();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea0d16d4/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 52295fb..99d4e5b 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -94,7 +94,7 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
-import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperatorStateContext;
@@ -341,8 +341,8 @@ public class StreamTaskTest extends TestLogger {
                StreamOperator<?> streamOperator3 = mock(StreamOperator.class);
 
                // mock the returned snapshots
-               OperatorSnapshotResult operatorSnapshotResult1 = 
mock(OperatorSnapshotResult.class);
-               OperatorSnapshotResult operatorSnapshotResult2 = 
mock(OperatorSnapshotResult.class);
+               OperatorSnapshotFutures operatorSnapshotResult1 = 
mock(OperatorSnapshotFutures.class);
+               OperatorSnapshotFutures operatorSnapshotResult2 = 
mock(OperatorSnapshotFutures.class);
 
                final Exception testException = new Exception("Test exception");
 
@@ -410,9 +410,9 @@ public class StreamTaskTest extends TestLogger {
                StreamOperator<?> streamOperator3 = mock(StreamOperator.class);
 
                // mock the new state operator snapshots
-               OperatorSnapshotResult operatorSnapshotResult1 = 
mock(OperatorSnapshotResult.class);
-               OperatorSnapshotResult operatorSnapshotResult2 = 
mock(OperatorSnapshotResult.class);
-               OperatorSnapshotResult operatorSnapshotResult3 = 
mock(OperatorSnapshotResult.class);
+               OperatorSnapshotFutures operatorSnapshotResult1 = 
mock(OperatorSnapshotFutures.class);
+               OperatorSnapshotFutures operatorSnapshotResult2 = 
mock(OperatorSnapshotFutures.class);
+               OperatorSnapshotFutures operatorSnapshotResult3 = 
mock(OperatorSnapshotFutures.class);
 
                RunnableFuture<OperatorStateHandle> failingFuture = 
mock(RunnableFuture.class);
                when(failingFuture.get()).thenThrow(new ExecutionException(new 
Exception("Test exception")));
@@ -520,7 +520,7 @@ public class StreamTaskTest extends TestLogger {
                OperatorStateHandle managedOperatorStateHandle = 
mock(OperatorStateHandle.class);
                OperatorStateHandle rawOperatorStateHandle = 
mock(OperatorStateHandle.class);
 
-               OperatorSnapshotResult operatorSnapshotResult = new 
OperatorSnapshotResult(
+               OperatorSnapshotFutures operatorSnapshotResult = new 
OperatorSnapshotFutures(
                        new DoneFuture<>(managedKeyedStateHandle),
                        new DoneFuture<>(rawKeyedStateHandle),
                        new DoneFuture<>(managedOperatorStateHandle),
@@ -635,7 +635,7 @@ public class StreamTaskTest extends TestLogger {
                OperatorStateHandle managedOperatorStateHandle = 
mock(OperatorStateHandle.class);
                OperatorStateHandle rawOperatorStateHandle = 
mock(OperatorStateHandle.class);
 
-               OperatorSnapshotResult operatorSnapshotResult = new 
OperatorSnapshotResult(
+               OperatorSnapshotFutures operatorSnapshotResult = new 
OperatorSnapshotFutures(
                        new DoneFuture<>(managedKeyedStateHandle),
                        new DoneFuture<>(rawKeyedStateHandle),
                        new DoneFuture<>(managedOperatorStateHandle),
@@ -739,7 +739,7 @@ public class StreamTaskTest extends TestLogger {
                when(statelessOperator.getOperatorID()).thenReturn(operatorID);
 
                // mock the returned empty snapshot result (all state handles 
are null)
-               OperatorSnapshotResult statelessOperatorSnapshotResult = new 
OperatorSnapshotResult();
+               OperatorSnapshotFutures statelessOperatorSnapshotResult = new 
OperatorSnapshotFutures();
                when(statelessOperator.snapshotState(anyLong(), anyLong(), 
any(CheckpointOptions.class), any(CheckpointStreamFactory.class)))
                                .thenReturn(statelessOperatorSnapshotResult);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ea0d16d4/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index d38cb28..28ad930 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -47,7 +47,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperatorTest;
-import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
@@ -470,7 +470,7 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
         */
        public OperatorSubtaskState snapshot(long checkpointId, long timestamp) 
throws Exception {
 
-               OperatorSnapshotResult operatorStateResult = 
operator.snapshotState(
+               OperatorSnapshotFutures operatorStateResult = 
operator.snapshotState(
                        checkpointId,
                        timestamp,
                        CheckpointOptions.forCheckpointWithDefaultLocation(),

Reply via email to