Repository: flink Updated Branches: refs/heads/master 1836e08f0 -> 9e17cbd6b
[FLINK-4788] [streaming api] Fix state backend classloading from configuration Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9e17cbd6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9e17cbd6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9e17cbd6 Branch: refs/heads/master Commit: 9e17cbd6b768f73299a6a344fdf44539802fb76c Parents: 1836e08 Author: Stephan Ewen <[email protected]> Authored: Mon Oct 10 14:33:57 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Mon Oct 10 20:02:33 2016 +0200 ---------------------------------------------------------------------- .../streaming/runtime/tasks/StreamTask.java | 3 +- .../streaming/runtime/tasks/StreamTaskTest.java | 95 +++++++++----------- 2 files changed, 43 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9e17cbd6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 8ada6d3..4893fed 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -752,8 +752,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> backendName = "jobmanager"; } - backendName = backendName.toLowerCase(); - switch (backendName) { + switch (backendName.toLowerCase()) { case "jobmanager": LOG.info("State backend is set to heap memory (checkpoint to jobmanager)"); stateBackend = new MemoryStateBackend(); http://git-wip-us.apache.org/repos/asf/flink/blob/9e17cbd6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 47a4090..8aae19f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -18,12 +18,12 @@ package org.apache.flink.streaming.runtime.tasks; -import akka.actor.ActorRef; - import akka.dispatch.Futures; + import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; @@ -34,7 +34,6 @@ import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.filecache.FileCache; -import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; @@ -44,8 +43,10 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; +import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.StateBackendFactory; import org.apache.flink.runtime.taskmanager.CheckpointResponder; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.TaskExecutionState; @@ -60,10 +61,10 @@ import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.SerializedValue; + import org.junit.Test; import scala.concurrent.Await; -import scala.concurrent.ExecutionContext; import scala.concurrent.Future; import scala.concurrent.duration.Deadline; import scala.concurrent.duration.FiniteDuration; @@ -75,22 +76,20 @@ import java.net.URL; import java.util.Collections; import java.util.Comparator; import java.util.PriorityQueue; -import java.util.UUID; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class StreamTaskTest { - /** + /** * This test checks that cancel calls that are issued before the operator is * instantiated still lead to proper canceling. */ @@ -101,7 +100,7 @@ public class StreamTaskTest { cfg.setStreamOperator(new SlowlyDeserializingOperator()); cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); - Task task = createTask(SourceStreamTask.class, cfg); + Task task = createTask(SourceStreamTask.class, cfg, new Configuration()); TestingExecutionStateListener testingExecutionStateListener = new TestingExecutionStateListener(); @@ -137,6 +136,24 @@ public class StreamTaskTest { assertEquals(ExecutionState.CANCELED, task.getExecutionState()); } + @Test + public void testStateBackendLoading() throws Exception { + Configuration taskManagerConfig = new Configuration(); + taskManagerConfig.setString(ConfigConstants.STATE_BACKEND, MockStateBackend.class.getName()); + + StreamConfig cfg = new StreamConfig(new Configuration()); + cfg.setStreamOperator(new StreamSource<>(new MockSourceFunction())); + cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + Task task = createTask(SourceStreamTask.class, cfg, taskManagerConfig); + + task.startTaskThread(); + + // wait for clean termination + task.getExecutingThread().join(); + assertEquals(ExecutionState.FINISHED, task.getExecutionState()); + } + // ------------------------------------------------------------------------ // Test Utilities @@ -144,9 +161,9 @@ public class StreamTaskTest { private static class TestingExecutionStateListener implements TaskExecutionStateListener { - ExecutionState executionState = null; + private ExecutionState executionState = null; - PriorityQueue<Tuple2<ExecutionState, Promise<ExecutionState>>> priorityQueue = new PriorityQueue<>( + private final PriorityQueue<Tuple2<ExecutionState, Promise<ExecutionState>>> priorityQueue = new PriorityQueue<>( 1, new Comparator<Tuple2<ExecutionState, Promise<ExecutionState>>>() { @Override @@ -183,7 +200,11 @@ public class StreamTaskTest { } } - private Task createTask(Class<? extends AbstractInvokable> invokable, StreamConfig taskConfig) throws Exception { + private Task createTask( + Class<? extends AbstractInvokable> invokable, + StreamConfig taskConfig, + Configuration taskManagerConfig) throws Exception { + LibraryCacheManager libCache = mock(LibraryCacheManager.class); when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader()); @@ -191,6 +212,7 @@ public class StreamTaskTest { ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class); PartitionStateChecker partitionStateChecker = mock(PartitionStateChecker.class); Executor executor = mock(Executor.class); + NetworkEnvironment network = mock(NetworkEnvironment.class); when(network.getResultPartitionManager()).thenReturn(partitionManager); when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC); @@ -221,8 +243,8 @@ public class StreamTaskTest { mock(CheckpointResponder.class), libCache, mock(FileCache.class), - new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")), - mock(TaskMetricGroup.class), + new TaskManagerRuntimeInfo("localhost", taskManagerConfig, System.getProperty("java.io.tmpdir")), + new UnregisteredTaskMetricsGroup(), consumableNotifier, partitionStateChecker, executor); @@ -279,45 +301,12 @@ public class StreamTaskTest { public void cancel() {} } - // ------------------------------------------------------------------------ - // Test JobManager/TaskManager gateways - // ------------------------------------------------------------------------ - - private static class DummyGateway implements ActorGateway { + public static final class MockStateBackend implements StateBackendFactory<AbstractStateBackend> { private static final long serialVersionUID = 1L; @Override - public Future<Object> ask(Object message, FiniteDuration timeout) { - return null; - } - - @Override - public void tell(Object message) {} - - @Override - public void tell(Object message, ActorGateway sender) {} - - @Override - public void forward(Object message, ActorGateway sender) {} - - @Override - public Future<Object> retry(Object message, int numberRetries, FiniteDuration timeout, ExecutionContext executionContext) { - return null; - } - - @Override - public String path() { - return null; - } - - @Override - public ActorRef actor() { - return null; - } - - @Override - public UUID leaderSessionID() { - return null; + public AbstractStateBackend createFromConfig(Configuration config) throws Exception { + return mock(AbstractStateBackend.class); } } }
