Rebasing fixes on latest master
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/930334ef Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/930334ef Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/930334ef Branch: refs/heads/flip-6 Commit: 930334ef7336d66b9161003575d12a3d66805c89 Parents: 0b981d6 Author: Till Rohrmann <[email protected]> Authored: Thu Oct 20 19:54:12 2016 +0200 Committer: Till Rohrmann <[email protected]> Committed: Fri Oct 21 14:46:37 2016 +0200 ---------------------------------------------------------------------- .../apache/flink/storm/wrappers/BoltWrapperTest.java | 4 ++-- .../checkpoint/CheckpointCoordinatorGateway.java | 5 ++--- .../org/apache/flink/runtime/jobmaster/JobMaster.java | 12 ++++++------ .../taskexecutor/rpc/RpcCheckpointResponder.java | 6 +++--- .../org/apache/flink/runtime/util/ZooKeeperUtils.java | 10 +++------- .../apache/flink/runtime/jobmanager/JobSubmitTest.java | 4 +++- .../runtime/operators/testutils/DummyEnvironment.java | 3 ++- .../flink/runtime/taskmanager/TaskAsyncCallTest.java | 6 ------ ...mulatingAlignedProcessingTimeWindowOperatorTest.java | 3 ++- 9 files changed, 23 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/930334ef/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java index e0659da..ec81fa6 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java @@ -35,7 +35,7 @@ import org.apache.flink.configuration.UnmodifiableConfiguration; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; -import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; +import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; import org.apache.flink.storm.util.AbstractTest; import org.apache.flink.storm.util.SplitStreamType; import org.apache.flink.storm.util.StormConfig; @@ -370,7 +370,7 @@ public class BoltWrapperTest extends AbstractTest { when(env.getTaskInfo()).thenReturn(new TaskInfo("Mock Task", 1, 0, 1, 0)); when(env.getUserClassLoader()).thenReturn(BoltWrapperTest.class.getClassLoader()); when(env.getMetricGroup()).thenReturn(new UnregisteredTaskMetricsGroup()); - when(env.getTaskManagerInfo()).thenReturn(new TaskManagerRuntimeInfo("foo", new Configuration(), "foo")); + when(env.getTaskManagerInfo()).thenReturn(new TestingTaskManagerRuntimeInfo()); StreamTask<?, ?> mockTask = mock(StreamTask.class); when(mockTask.getCheckpointLock()).thenReturn(new Object()); http://git-wip-us.apache.org/repos/asf/flink/blob/930334ef/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java index 2634006..f9786f6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java @@ -21,15 +21,14 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.rpc.RpcGateway; -import org.apache.flink.runtime.state.CheckpointStateHandles; public interface CheckpointCoordinatorGateway extends RpcGateway { void acknowledgeCheckpoint( final JobID jobID, final ExecutionAttemptID executionAttemptID, - final CheckpointMetaData checkpointInfo, - final CheckpointStateHandles checkpointStateHandles); + final CheckpointMetaData checkpointMetaData, + final SubtaskState subtaskState); void declineCheckpoint( final JobID jobID, http://git-wip-us.apache.org/repos/asf/flink/blob/930334ef/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 56fa3e7..5a7c9a1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.client.JobSubmissionException; import org.apache.flink.runtime.client.SerializedJobExecutionResult; @@ -83,7 +84,6 @@ import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcMethod; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.StartStoppable; -import org.apache.flink.runtime.state.CheckpointStateHandles; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; @@ -529,12 +529,12 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { public void acknowledgeCheckpoint( final JobID jobID, final ExecutionAttemptID executionAttemptID, - final CheckpointMetaData checkpointInfo, - final CheckpointStateHandles checkpointState) + final CheckpointMetaData checkpointMetaData, + final SubtaskState subtaskState) { final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); final AcknowledgeCheckpoint ackMessage = - new AcknowledgeCheckpoint(jobID, executionAttemptID, checkpointInfo, checkpointState); + new AcknowledgeCheckpoint(jobID, executionAttemptID, checkpointMetaData, subtaskState); if (checkpointCoordinator != null) { getRpcService().execute(new Runnable() { @@ -543,10 +543,10 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { try { if (!checkpointCoordinator.receiveAcknowledgeMessage(ackMessage)) { log.info("Received message for non-existing checkpoint {}.", - checkpointInfo.getCheckpointId()); + checkpointMetaData.getCheckpointId()); } } catch (Exception e) { - log.error("Error in CheckpointCoordinator while processing {}", checkpointInfo, e); + log.error("Error in CheckpointCoordinator while processing {}", checkpointMetaData, e); } } }); http://git-wip-us.apache.org/repos/asf/flink/blob/930334ef/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java index 9669da0..c18da67 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java @@ -21,8 +21,8 @@ package org.apache.flink.runtime.taskexecutor.rpc; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.state.CheckpointStateHandles; import org.apache.flink.runtime.taskmanager.CheckpointResponder; import org.apache.flink.util.Preconditions; @@ -39,13 +39,13 @@ public class RpcCheckpointResponder implements CheckpointResponder { JobID jobID, ExecutionAttemptID executionAttemptID, CheckpointMetaData checkpointMetaData, - CheckpointStateHandles checkpointStateHandles) { + SubtaskState subtaskState) { checkpointCoordinatorGateway.acknowledgeCheckpoint( jobID, executionAttemptID, checkpointMetaData, - checkpointStateHandles); + subtaskState); } http://git-wip-us.apache.org/repos/asf/flink/blob/930334ef/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java index e9777a3..cb5dc31 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java @@ -161,7 +161,6 @@ public class ZooKeeperUtils { */ public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService( Configuration configuration) { - { final CuratorFramework client = startCuratorFramework(configuration); return createLeaderRetrievalService(client, configuration); } @@ -172,11 +171,10 @@ public class ZooKeeperUtils { * @param client The {@link CuratorFramework} ZooKeeper client to use * @param configuration {@link Configuration} object containing the configuration values * @return {@link ZooKeeperLeaderRetrievalService} instance. - * @throws Exception */ public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService( final CuratorFramework client, - final Configuration configuration) throws Exception + final Configuration configuration) { return createLeaderRetrievalService(client, configuration, ""); } @@ -188,12 +186,11 @@ public class ZooKeeperUtils { * @param configuration {@link Configuration} object containing the configuration values * @param pathSuffix The path suffix which we want to append * @return {@link ZooKeeperLeaderRetrievalService} instance. - * @throws Exception */ public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService( final CuratorFramework client, final Configuration configuration, - final String pathSuffix) throws Exception + final String pathSuffix) { String leaderPath = ConfigurationUtil.getStringWithDeprecatedKeys( configuration, @@ -240,12 +237,11 @@ public class ZooKeeperUtils { * @param configuration {@link Configuration} object containing the configuration values * @param pathSuffix The path suffix which we want to append * @return {@link ZooKeeperLeaderElectionService} instance. - * @throws Exception */ public static ZooKeeperLeaderElectionService createLeaderElectionService( final CuratorFramework client, final Configuration configuration, - final String pathSuffix) throws Exception + final String pathSuffix) { final String latchPath = ConfigurationUtil.getStringWithDeprecatedKeys( configuration, http://git-wip-us.apache.org/repos/asf/flink/blob/930334ef/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java index 260b4d4..3c45ccc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java @@ -168,6 +168,8 @@ public class JobSubmitTest { JobVertex jobVertex = new JobVertex("Vertex that fails in initializeOnMaster") { + private static final long serialVersionUID = -3540303593784587652L; + @Override public void initializeOnMaster(ClassLoader loader) throws Exception { throw new RuntimeException("test exception"); @@ -217,7 +219,7 @@ public class JobSubmitTest { private JobGraph createSimpleJobGraph() { JobVertex jobVertex = new JobVertex("Vertex"); - jobVertex.setInvokableClass(Tasks.NoOpInvokable.class); + jobVertex.setInvokableClass(NoOpInvokable.class); List<JobVertexID> vertexIdList = Collections.singletonList(jobVertex.getID()); JobGraph jg = new JobGraph("test job", jobVertex); http://git-wip-us.apache.org/repos/asf/flink/blob/930334ef/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java index f2616b5..3eba048 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java @@ -39,6 +39,7 @@ import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; +import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; import java.util.Collections; import java.util.Map; @@ -92,7 +93,7 @@ public class DummyEnvironment implements Environment { @Override public TaskManagerRuntimeInfo getTaskManagerInfo() { - return new TaskManagerRuntimeInfo("foo", new Configuration(), "foo"); + return new TestingTaskManagerRuntimeInfo(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/930334ef/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java index 2a9ff61..7494d7a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java @@ -45,10 +45,6 @@ import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.query.TaskKvStateRegistry; -import org.apache.flink.runtime.state.ChainedStateHandle; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; -import org.apache.flink.runtime.state.OperatorStateHandle; -import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TaskStateHandles; import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; import org.apache.flink.util.SerializedValue; @@ -57,9 +53,7 @@ import org.junit.Before; import org.junit.Test; import java.net.URL; -import java.util.Collection; import java.util.Collections; -import java.util.List; import java.util.concurrent.Executor; import static org.junit.Assert.assertFalse; http://git-wip-us.apache.org/repos/asf/flink/blob/930334ef/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java index e96109e..bc62890 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java @@ -34,6 +34,7 @@ import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; +import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.graph.StreamConfig; @@ -794,7 +795,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { when(env.getTaskInfo()).thenReturn(new TaskInfo("Test task name", 1, 0, 1, 0)); when(env.getUserClassLoader()).thenReturn(AggregatingAlignedProcessingTimeWindowOperatorTest.class.getClassLoader()); when(env.getMetricGroup()).thenReturn(new UnregisteredTaskMetricsGroup()); - when(env.getTaskManagerInfo()).thenReturn(new TaskManagerRuntimeInfo("foo", new Configuration(), "foo")); + when(env.getTaskManagerInfo()).thenReturn(new TestingTaskManagerRuntimeInfo()); when(task.getEnvironment()).thenReturn(env); return task;
