Repository: flink
Updated Branches:
  refs/heads/master 1836e08f0 -> 9e17cbd6b


[FLINK-4788] [streaming api] Fix state backend classloading from configuration


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9e17cbd6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9e17cbd6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9e17cbd6

Branch: refs/heads/master
Commit: 9e17cbd6b768f73299a6a344fdf44539802fb76c
Parents: 1836e08
Author: Stephan Ewen <[email protected]>
Authored: Mon Oct 10 14:33:57 2016 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Mon Oct 10 20:02:33 2016 +0200

----------------------------------------------------------------------
 .../streaming/runtime/tasks/StreamTask.java     |  3 +-
 .../streaming/runtime/tasks/StreamTaskTest.java | 95 +++++++++-----------
 2 files changed, 43 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9e17cbd6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 8ada6d3..4893fed 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -752,8 +752,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                backendName = "jobmanager";
                        }
 
-                       backendName = backendName.toLowerCase();
-                       switch (backendName) {
+                       switch (backendName.toLowerCase()) {
                                case "jobmanager":
                                        LOG.info("State backend is set to heap 
memory (checkpoint to jobmanager)");
                                        stateBackend = new MemoryStateBackend();

http://git-wip-us.apache.org/repos/asf/flink/blob/9e17cbd6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 47a4090..8aae19f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import akka.actor.ActorRef;
-
 import akka.dispatch.Futures;
+
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -34,7 +34,6 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.filecache.FileCache;
-import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
@@ -44,8 +43,10 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StateBackendFactory;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
@@ -60,10 +61,10 @@ import 
org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
+
 import org.junit.Test;
 
 import scala.concurrent.Await;
-import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
@@ -75,22 +76,20 @@ import java.net.URL;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.PriorityQueue;
-import java.util.UUID;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class StreamTaskTest {
 
-               /**
+       /**
         * This test checks that cancel calls that are issued before the 
operator is
         * instantiated still lead to proper canceling.
         */
@@ -101,7 +100,7 @@ public class StreamTaskTest {
                cfg.setStreamOperator(new SlowlyDeserializingOperator());
                cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
-               Task task = createTask(SourceStreamTask.class, cfg);
+               Task task = createTask(SourceStreamTask.class, cfg, new 
Configuration());
 
                TestingExecutionStateListener testingExecutionStateListener = 
new TestingExecutionStateListener();
 
@@ -137,6 +136,24 @@ public class StreamTaskTest {
                assertEquals(ExecutionState.CANCELED, task.getExecutionState());
        }
 
+       @Test
+       public void testStateBackendLoading() throws Exception {
+               Configuration taskManagerConfig = new Configuration();
+               taskManagerConfig.setString(ConfigConstants.STATE_BACKEND, 
MockStateBackend.class.getName());
+
+               StreamConfig cfg = new StreamConfig(new Configuration());
+               cfg.setStreamOperator(new StreamSource<>(new 
MockSourceFunction()));
+               cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+               Task task = createTask(SourceStreamTask.class, cfg, 
taskManagerConfig);
+
+               task.startTaskThread();
+
+               // wait for clean termination
+               task.getExecutingThread().join();
+               assertEquals(ExecutionState.FINISHED, task.getExecutionState());
+       }
+
 
        // 
------------------------------------------------------------------------
        //  Test Utilities
@@ -144,9 +161,9 @@ public class StreamTaskTest {
 
        private static class TestingExecutionStateListener implements 
TaskExecutionStateListener {
 
-               ExecutionState executionState = null;
+               private ExecutionState executionState = null;
 
-               PriorityQueue<Tuple2<ExecutionState, Promise<ExecutionState>>> 
priorityQueue = new PriorityQueue<>(
+               private final PriorityQueue<Tuple2<ExecutionState, 
Promise<ExecutionState>>> priorityQueue = new PriorityQueue<>(
                        1,
                        new Comparator<Tuple2<ExecutionState, 
Promise<ExecutionState>>>() {
                                @Override
@@ -183,7 +200,11 @@ public class StreamTaskTest {
                }
        }
 
-       private Task createTask(Class<? extends AbstractInvokable> invokable, 
StreamConfig taskConfig) throws Exception {
+       private Task createTask(
+                       Class<? extends AbstractInvokable> invokable,
+                       StreamConfig taskConfig,
+                       Configuration taskManagerConfig) throws Exception {
+
                LibraryCacheManager libCache = mock(LibraryCacheManager.class);
                
when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
                
@@ -191,6 +212,7 @@ public class StreamTaskTest {
                ResultPartitionConsumableNotifier consumableNotifier = 
mock(ResultPartitionConsumableNotifier.class);
                PartitionStateChecker partitionStateChecker = 
mock(PartitionStateChecker.class);
                Executor executor = mock(Executor.class);
+
                NetworkEnvironment network = mock(NetworkEnvironment.class);
                
when(network.getResultPartitionManager()).thenReturn(partitionManager);
                
when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
@@ -221,8 +243,8 @@ public class StreamTaskTest {
                        mock(CheckpointResponder.class),
                        libCache,
                        mock(FileCache.class),
-                       new TaskManagerRuntimeInfo("localhost", new 
Configuration(), System.getProperty("java.io.tmpdir")),
-                       mock(TaskMetricGroup.class),
+                       new TaskManagerRuntimeInfo("localhost", 
taskManagerConfig, System.getProperty("java.io.tmpdir")),
+                       new UnregisteredTaskMetricsGroup(),
                        consumableNotifier,
                        partitionStateChecker,
                        executor);
@@ -279,45 +301,12 @@ public class StreamTaskTest {
                public void cancel() {}
        }
 
-       // 
------------------------------------------------------------------------
-       //  Test JobManager/TaskManager gateways
-       // 
------------------------------------------------------------------------
-       
-       private static class DummyGateway implements ActorGateway {
+       public static final class MockStateBackend implements 
StateBackendFactory<AbstractStateBackend> {
                private static final long serialVersionUID = 1L;
 
                @Override
-               public Future<Object> ask(Object message, FiniteDuration 
timeout) {
-                       return null;
-               }
-
-               @Override
-               public void tell(Object message) {}
-
-               @Override
-               public void tell(Object message, ActorGateway sender) {}
-
-               @Override
-               public void forward(Object message, ActorGateway sender) {}
-
-               @Override
-               public Future<Object> retry(Object message, int numberRetries, 
FiniteDuration timeout, ExecutionContext executionContext) {
-                       return null;
-               }
-
-               @Override
-               public String path() {
-                       return null;
-               }
-
-               @Override
-               public ActorRef actor() {
-                       return null;
-               }
-
-               @Override
-               public UUID leaderSessionID() {
-                       return null;
+               public AbstractStateBackend createFromConfig(Configuration 
config) throws Exception {
+                       return mock(AbstractStateBackend.class);
                }
        }
 }

Reply via email to