Repository: flink Updated Branches: refs/heads/release-1.1 7267562bb -> d619f51ac
[FLINK-4788] [streaming api] Fix state backend classloading from configuration Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d619f51a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d619f51a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d619f51a Branch: refs/heads/release-1.1 Commit: d619f51ac8f922c0cf1d1e789c5141076128f04e Parents: 7267562 Author: Stephan Ewen <[email protected]> Authored: Mon Oct 10 14:33:57 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Mon Oct 10 14:40:52 2016 +0200 ---------------------------------------------------------------------- .../streaming/runtime/tasks/StreamTask.java | 3 +- .../streaming/runtime/tasks/StreamTaskTest.java | 47 +++++++++++++++++--- 2 files changed, 42 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d619f51a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 940f699..d56c9bf 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -684,8 +684,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> backendName = "jobmanager"; } - backendName = backendName.toLowerCase(); - switch (backendName) { + switch (backendName.toLowerCase()) { case "jobmanager": LOG.info("State backend is set to heap memory (checkpoint to jobmanager)"); stateBackend = MemoryStateBackend.create(); http://git-wip-us.apache.org/repos/asf/flink/blob/d619f51a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index e9d583c..83eb4bb 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -22,8 +22,8 @@ import akka.actor.ActorRef; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; @@ -41,8 +41,12 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.StateBackendFactory; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; +import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.Output; @@ -85,7 +89,7 @@ public class StreamTaskTest { StreamConfig cfg = new StreamConfig(new Configuration()); cfg.setStreamOperator(new SlowlyDeserializingOperator()); - Task task = createTask(SourceStreamTask.class, cfg); + Task task = createTask(SourceStreamTask.class, cfg, new Configuration()); task.startTaskThread(); // wait until the task thread reached state RUNNING @@ -120,14 +124,37 @@ public class StreamTaskTest { } } + @Test + public void testStateBackendLoading() throws Exception { + Configuration taskManagerConfig = new Configuration(); + taskManagerConfig.setString(ConfigConstants.STATE_BACKEND, MockStateBackend.class.getName()); + + StreamConfig cfg = new StreamConfig(new Configuration()); + cfg.setStreamOperator(new StreamSource<>(new MockSourceFunction())); + cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + Task task = createTask(SourceStreamTask.class, cfg, taskManagerConfig); + + task.startTaskThread(); + + // wait for clean termination + task.getExecutingThread().join(); + assertEquals(ExecutionState.FINISHED, task.getExecutionState()); + } + + // ------------------------------------------------------------------------ // Test Utilities // ------------------------------------------------------------------------ - private Task createTask(Class<? extends AbstractInvokable> invokable, StreamConfig taskConfig) throws Exception { + private Task createTask( + Class<? extends AbstractInvokable> invokable, + StreamConfig taskConfig, + Configuration taskManagerConfig) throws Exception { + LibraryCacheManager libCache = mock(LibraryCacheManager.class); when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader()); - + ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class); NetworkEnvironment network = mock(NetworkEnvironment.class); @@ -159,8 +186,8 @@ public class StreamTaskTest { new FiniteDuration(60, TimeUnit.SECONDS), libCache, mock(FileCache.class), - new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")), - mock(TaskMetricGroup.class)); + new TaskManagerRuntimeInfo("localhost", taskManagerConfig, System.getProperty("java.io.tmpdir")), + new UnregisteredTaskMetricsGroup()); } // ------------------------------------------------------------------------ @@ -255,4 +282,12 @@ public class StreamTaskTest { return null; } } + + public static final class MockStateBackend implements StateBackendFactory<AbstractStateBackend> { + + @Override + public AbstractStateBackend createFromConfig(Configuration config) throws Exception { + return mock(AbstractStateBackend.class); + } + } }
