Rebasing fixes on latest master

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/930334ef
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/930334ef
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/930334ef

Branch: refs/heads/flip-6
Commit: 930334ef7336d66b9161003575d12a3d66805c89
Parents: 0b981d6
Author: Till Rohrmann <[email protected]>
Authored: Thu Oct 20 19:54:12 2016 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Fri Oct 21 14:46:37 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/storm/wrappers/BoltWrapperTest.java    |  4 ++--
 .../checkpoint/CheckpointCoordinatorGateway.java        |  5 ++---
 .../org/apache/flink/runtime/jobmaster/JobMaster.java   | 12 ++++++------
 .../taskexecutor/rpc/RpcCheckpointResponder.java        |  6 +++---
 .../org/apache/flink/runtime/util/ZooKeeperUtils.java   | 10 +++-------
 .../apache/flink/runtime/jobmanager/JobSubmitTest.java  |  4 +++-
 .../runtime/operators/testutils/DummyEnvironment.java   |  3 ++-
 .../flink/runtime/taskmanager/TaskAsyncCallTest.java    |  6 ------
 ...mulatingAlignedProcessingTimeWindowOperatorTest.java |  3 ++-
 9 files changed, 23 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/930334ef/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
index e0659da..ec81fa6 100644
--- 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
+++ 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
@@ -35,7 +35,7 @@ import 
org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.execution.Environment;
 import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
-import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.storm.util.AbstractTest;
 import org.apache.flink.storm.util.SplitStreamType;
 import org.apache.flink.storm.util.StormConfig;
@@ -370,7 +370,7 @@ public class BoltWrapperTest extends AbstractTest {
                when(env.getTaskInfo()).thenReturn(new TaskInfo("Mock Task", 1, 
0, 1, 0));
                
when(env.getUserClassLoader()).thenReturn(BoltWrapperTest.class.getClassLoader());
                when(env.getMetricGroup()).thenReturn(new 
UnregisteredTaskMetricsGroup());
-               when(env.getTaskManagerInfo()).thenReturn(new 
TaskManagerRuntimeInfo("foo", new Configuration(), "foo"));
+               when(env.getTaskManagerInfo()).thenReturn(new 
TestingTaskManagerRuntimeInfo());
 
                StreamTask<?, ?> mockTask = mock(StreamTask.class);
                when(mockTask.getCheckpointLock()).thenReturn(new Object());

http://git-wip-us.apache.org/repos/asf/flink/blob/930334ef/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
index 2634006..f9786f6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
@@ -21,15 +21,14 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.rpc.RpcGateway;
-import org.apache.flink.runtime.state.CheckpointStateHandles;
 
 public interface CheckpointCoordinatorGateway extends RpcGateway {
 
        void acknowledgeCheckpoint(
                        final JobID jobID,
                        final ExecutionAttemptID executionAttemptID,
-                       final CheckpointMetaData checkpointInfo,
-                       final CheckpointStateHandles checkpointStateHandles);
+                       final CheckpointMetaData checkpointMetaData,
+                       final SubtaskState subtaskState);
 
        void declineCheckpoint(
                        final JobID jobID,

http://git-wip-us.apache.org/repos/asf/flink/blob/930334ef/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 56fa3e7..5a7c9a1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.client.SerializedJobExecutionResult;
@@ -83,7 +84,6 @@ import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.StartStoppable;
-import org.apache.flink.runtime.state.CheckpointStateHandles;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
@@ -529,12 +529,12 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
        public void acknowledgeCheckpoint(
                        final JobID jobID,
                        final ExecutionAttemptID executionAttemptID,
-                       final CheckpointMetaData checkpointInfo,
-                       final CheckpointStateHandles checkpointState)
+                       final CheckpointMetaData checkpointMetaData,
+                       final SubtaskState subtaskState)
        {
                final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
                final AcknowledgeCheckpoint ackMessage = 
-                               new AcknowledgeCheckpoint(jobID, 
executionAttemptID, checkpointInfo, checkpointState);
+                               new AcknowledgeCheckpoint(jobID, 
executionAttemptID, checkpointMetaData, subtaskState);
 
                if (checkpointCoordinator != null) {
                        getRpcService().execute(new Runnable() {
@@ -543,10 +543,10 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                                        try {
                                                if 
(!checkpointCoordinator.receiveAcknowledgeMessage(ackMessage)) {
                                                        log.info("Received 
message for non-existing checkpoint {}.",
-                                                                       
checkpointInfo.getCheckpointId());
+                                                                       
checkpointMetaData.getCheckpointId());
                                                }
                                        } catch (Exception e) {
-                                               log.error("Error in 
CheckpointCoordinator while processing {}", checkpointInfo, e);
+                                               log.error("Error in 
CheckpointCoordinator while processing {}", checkpointMetaData, e);
                                        }
                                }
                        });

http://git-wip-us.apache.org/repos/asf/flink/blob/930334ef/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
index 9669da0..c18da67 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.taskexecutor.rpc;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.state.CheckpointStateHandles;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.util.Preconditions;
 
@@ -39,13 +39,13 @@ public class RpcCheckpointResponder implements 
CheckpointResponder {
                        JobID jobID,
                        ExecutionAttemptID executionAttemptID,
                        CheckpointMetaData checkpointMetaData,
-                       CheckpointStateHandles checkpointStateHandles) {
+                       SubtaskState subtaskState) {
 
                checkpointCoordinatorGateway.acknowledgeCheckpoint(
                        jobID,
                        executionAttemptID,
                        checkpointMetaData,
-                       checkpointStateHandles);
+                       subtaskState);
 
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/930334ef/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index e9777a3..cb5dc31 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -161,7 +161,6 @@ public class ZooKeeperUtils {
         */
        public static ZooKeeperLeaderRetrievalService 
createLeaderRetrievalService(
                        Configuration configuration) {
-       {
                final CuratorFramework client = 
startCuratorFramework(configuration);
                return createLeaderRetrievalService(client, configuration);
        }
@@ -172,11 +171,10 @@ public class ZooKeeperUtils {
         * @param client        The {@link CuratorFramework} ZooKeeper client 
to use
         * @param configuration {@link Configuration} object containing the 
configuration values
         * @return {@link ZooKeeperLeaderRetrievalService} instance.
-        * @throws Exception
         */
        public static ZooKeeperLeaderRetrievalService 
createLeaderRetrievalService(
                final CuratorFramework client,
-               final Configuration configuration) throws Exception
+               final Configuration configuration)
        {
                return createLeaderRetrievalService(client, configuration, "");
        }
@@ -188,12 +186,11 @@ public class ZooKeeperUtils {
         * @param configuration {@link Configuration} object containing the 
configuration values
         * @param pathSuffix    The path suffix which we want to append
         * @return {@link ZooKeeperLeaderRetrievalService} instance.
-        * @throws Exception
         */
        public static ZooKeeperLeaderRetrievalService 
createLeaderRetrievalService(
                final CuratorFramework client,
                final Configuration configuration,
-               final String pathSuffix) throws Exception
+               final String pathSuffix)
        {
                String leaderPath = 
ConfigurationUtil.getStringWithDeprecatedKeys(
                        configuration,
@@ -240,12 +237,11 @@ public class ZooKeeperUtils {
         * @param configuration {@link Configuration} object containing the 
configuration values
         * @param pathSuffix    The path suffix which we want to append
         * @return {@link ZooKeeperLeaderElectionService} instance.
-        * @throws Exception
         */
        public static ZooKeeperLeaderElectionService 
createLeaderElectionService(
                final CuratorFramework client,
                final Configuration configuration,
-               final String pathSuffix) throws Exception
+               final String pathSuffix)
        {
                final String latchPath = 
ConfigurationUtil.getStringWithDeprecatedKeys(
                        configuration,

http://git-wip-us.apache.org/repos/asf/flink/blob/930334ef/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 260b4d4..3c45ccc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -168,6 +168,8 @@ public class JobSubmitTest {
 
                        JobVertex jobVertex = new JobVertex("Vertex that fails 
in initializeOnMaster") {
 
+                               private static final long serialVersionUID = 
-3540303593784587652L;
+
                                @Override
                                public void initializeOnMaster(ClassLoader 
loader) throws Exception {
                                        throw new RuntimeException("test 
exception");
@@ -217,7 +219,7 @@ public class JobSubmitTest {
        private JobGraph createSimpleJobGraph() {
                JobVertex jobVertex = new JobVertex("Vertex");
 
-               jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+               jobVertex.setInvokableClass(NoOpInvokable.class);
                List<JobVertexID> vertexIdList = 
Collections.singletonList(jobVertex.getID());
 
                JobGraph jg = new JobGraph("test job", jobVertex);

http://git-wip-us.apache.org/repos/asf/flink/blob/930334ef/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index f2616b5..3eba048 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -39,6 +39,7 @@ import 
org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 
 import java.util.Collections;
 import java.util.Map;
@@ -92,7 +93,7 @@ public class DummyEnvironment implements Environment {
 
        @Override
        public TaskManagerRuntimeInfo getTaskManagerInfo() {
-               return new TaskManagerRuntimeInfo("foo", new Configuration(), 
"foo");
+               return new TestingTaskManagerRuntimeInfo();
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/930334ef/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 2a9ff61..7494d7a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -45,10 +45,6 @@ import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.util.SerializedValue;
@@ -57,9 +53,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.net.URL;
-import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
 import java.util.concurrent.Executor;
 
 import static org.junit.Assert.assertFalse;

http://git-wip-us.apache.org/repos/asf/flink/blob/930334ef/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index e96109e..bc62890 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.execution.Environment;
 import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -794,7 +795,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                when(env.getTaskInfo()).thenReturn(new TaskInfo("Test task 
name", 1, 0, 1, 0));
                
when(env.getUserClassLoader()).thenReturn(AggregatingAlignedProcessingTimeWindowOperatorTest.class.getClassLoader());
                when(env.getMetricGroup()).thenReturn(new 
UnregisteredTaskMetricsGroup());
-               when(env.getTaskManagerInfo()).thenReturn(new 
TaskManagerRuntimeInfo("foo", new Configuration(), "foo"));
+               when(env.getTaskManagerInfo()).thenReturn(new 
TestingTaskManagerRuntimeInfo());
 
                when(task.getEnvironment()).thenReturn(env);
                return task;

Reply via email to