Repository: flink
Updated Branches:
  refs/heads/flip-6 4f891a6c2 -> 6dc228fcf


[hotfix] [cluster management] Remove scala dependencies from MiniCluster.java


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f90e5ff2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f90e5ff2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f90e5ff2

Branch: refs/heads/flip-6
Commit: f90e5ff2deaf5eea350ca4a57d8cf77ceb4703b7
Parents: 4f891a6
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Mon Oct 17 14:41:52 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Oct 17 15:51:24 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/minicluster/MiniCluster.java     | 12 ++++--------
 .../org/apache/flink/runtime/akka/AkkaUtils.scala  | 17 +++++++++++++++++
 .../runtime/minicluster/MiniClusterITCase.java     |  2 +-
 3 files changed, 22 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f90e5ff2/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 1ee38e0..d85234d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -35,9 +35,6 @@ import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.util.ExceptionUtils;
 
-import scala.Option;
-import scala.Tuple2;
-
 import javax.annotation.concurrent.GuardedBy;
 
 import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
@@ -348,7 +345,7 @@ public class MiniCluster {
        /**
         * Factory method to instantiate the RPC service.
         * 
-        * @param config
+        * @param configuration
         *            The configuration of the mini cluster
         * @param askTimeout
         *            The default RPC timeout for asynchronous "ask" requests.
@@ -360,17 +357,16 @@ public class MiniCluster {
         * @return The instantiated RPC service
         */
        protected RpcService createRpcService(
-                       Configuration config,
+                       Configuration configuration,
                        Time askTimeout,
                        boolean remoteEnabled,
                        String bindAddress) {
 
                ActorSystem actorSystem;
                if (remoteEnabled) {
-                       Tuple2<String, Object> remoteSettings = new 
Tuple2<String, Object>(bindAddress, 0);
-                       actorSystem = AkkaUtils.createActorSystem(config, 
Option.apply(remoteSettings));
+                       actorSystem = 
AkkaUtils.createActorSystem(configuration, bindAddress, 0);
                } else {
-                       actorSystem = AkkaUtils.createLocalActorSystem(config);
+                       actorSystem = 
AkkaUtils.createLocalActorSystem(configuration);
                }
 
                return new AkkaRpcService(actorSystem, askTimeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/f90e5ff2/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 2461340..d98d90a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -55,6 +55,23 @@ object AkkaUtils {
   }
 
   /**
+    * Creates an actor system bound to the given hostname and port.
+    *
+    * @param configuration instance containing the user provided configuration 
values
+    * @param hostname of the network interface to bind to
+    * @param port of to bind to
+    * @return created actor system
+    */
+  def createActorSystem(
+      configuration: Configuration,
+      hostname: String,
+      port: Int)
+    : ActorSystem = {
+
+    createActorSystem(configuration, Some((hostname, port)))
+  }
+
+  /**
    * Creates an actor system. If a listening address is specified, then the 
actor system will listen
    * on that address for messages from a remote actor system. If not, then a 
local actor system
    * will be instantiated.

http://git-wip-us.apache.org/repos/asf/flink/blob/f90e5ff2/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index dd43337..ef53547 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -31,7 +31,7 @@ import org.junit.Test;
  */
 public class MiniClusterITCase extends TestLogger {
 
-//     @Test
+       @Test
        public void runJobWithSingleRpcService() throws Exception {
                MiniClusterConfiguration cfg = new MiniClusterConfiguration();
 

Reply via email to