Repository: flink
Updated Branches:
  refs/heads/release-1.1 7267562bb -> d619f51ac


[FLINK-4788] [streaming api] Fix state backend classloading from configuration


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d619f51a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d619f51a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d619f51a

Branch: refs/heads/release-1.1
Commit: d619f51ac8f922c0cf1d1e789c5141076128f04e
Parents: 7267562
Author: Stephan Ewen <[email protected]>
Authored: Mon Oct 10 14:33:57 2016 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Mon Oct 10 14:40:52 2016 +0200

----------------------------------------------------------------------
 .../streaming/runtime/tasks/StreamTask.java     |  3 +-
 .../streaming/runtime/tasks/StreamTaskTest.java | 47 +++++++++++++++++---
 2 files changed, 42 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d619f51a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 940f699..d56c9bf 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -684,8 +684,7 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                                backendName = "jobmanager";
                        }
 
-                       backendName = backendName.toLowerCase();
-                       switch (backendName) {
+                       switch (backendName.toLowerCase()) {
                                case "jobmanager":
                                        LOG.info("State backend is set to heap 
memory (checkpoint to jobmanager)");
                                        stateBackend = 
MemoryStateBackend.create();

http://git-wip-us.apache.org/repos/asf/flink/blob/d619f51a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index e9d583c..83eb4bb 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -22,8 +22,8 @@ import akka.actor.ActorRef;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -41,8 +41,12 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
+import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StateBackendFactory;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
@@ -85,7 +89,7 @@ public class StreamTaskTest {
                        StreamConfig cfg = new StreamConfig(new 
Configuration());
                        cfg.setStreamOperator(new 
SlowlyDeserializingOperator());
                        
-                       Task task = createTask(SourceStreamTask.class, cfg);
+                       Task task = createTask(SourceStreamTask.class, cfg, new 
Configuration());
                        task.startTaskThread();
                        
                        // wait until the task thread reached state RUNNING 
@@ -120,14 +124,37 @@ public class StreamTaskTest {
                }
        }
 
+       @Test
+       public void testStateBackendLoading() throws Exception {
+               Configuration taskManagerConfig = new Configuration();
+               taskManagerConfig.setString(ConfigConstants.STATE_BACKEND, 
MockStateBackend.class.getName());
+
+               StreamConfig cfg = new StreamConfig(new Configuration());
+               cfg.setStreamOperator(new StreamSource<>(new 
MockSourceFunction()));
+               cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+               Task task = createTask(SourceStreamTask.class, cfg, 
taskManagerConfig);
+
+               task.startTaskThread();
+
+               // wait for clean termination
+               task.getExecutingThread().join();
+               assertEquals(ExecutionState.FINISHED, task.getExecutionState());
+       }
+
+
        // 
------------------------------------------------------------------------
        //  Test Utilities
        // 
------------------------------------------------------------------------
 
-       private Task createTask(Class<? extends AbstractInvokable> invokable, 
StreamConfig taskConfig) throws Exception {
+       private Task createTask(
+                       Class<? extends AbstractInvokable> invokable,
+                       StreamConfig taskConfig,
+                       Configuration taskManagerConfig) throws Exception {
+
                LibraryCacheManager libCache = mock(LibraryCacheManager.class);
                
when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
-               
+
                ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
                ResultPartitionConsumableNotifier consumableNotifier = 
mock(ResultPartitionConsumableNotifier.class);
                NetworkEnvironment network = mock(NetworkEnvironment.class);
@@ -159,8 +186,8 @@ public class StreamTaskTest {
                                new FiniteDuration(60, TimeUnit.SECONDS),
                                libCache,
                                mock(FileCache.class),
-                               new TaskManagerRuntimeInfo("localhost", new 
Configuration(), System.getProperty("java.io.tmpdir")),
-                               mock(TaskMetricGroup.class));
+                               new TaskManagerRuntimeInfo("localhost", 
taskManagerConfig, System.getProperty("java.io.tmpdir")),
+                               new UnregisteredTaskMetricsGroup());
        }
        
        // 
------------------------------------------------------------------------
@@ -255,4 +282,12 @@ public class StreamTaskTest {
                        return null;
                }
        }
+
+       public static final class MockStateBackend implements 
StateBackendFactory<AbstractStateBackend> {
+
+               @Override
+               public AbstractStateBackend createFromConfig(Configuration 
config) throws Exception {
+                       return mock(AbstractStateBackend.class);
+               }
+       }
 }

Reply via email to