[hotfix] [cluster management] Remove scala dependencies from MiniCluster.java
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ba580bc Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ba580bc Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ba580bc Branch: refs/heads/flip-6 Commit: 7ba580bc07ff5b1d2ae9be1a738f51ffdf6b03db Parents: dbf4b99 Author: Till Rohrmann <[email protected]> Authored: Mon Oct 17 14:41:52 2016 +0200 Committer: Till Rohrmann <[email protected]> Committed: Thu Oct 20 19:50:36 2016 +0200 ---------------------------------------------------------------------- .../flink/runtime/minicluster/MiniCluster.java | 12 ++++-------- .../org/apache/flink/runtime/akka/AkkaUtils.scala | 17 +++++++++++++++++ .../runtime/minicluster/MiniClusterITCase.java | 2 +- 3 files changed, 22 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7ba580bc/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 1ee38e0..d85234d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -35,9 +35,6 @@ import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.util.ExceptionUtils; -import scala.Option; -import scala.Tuple2; - import javax.annotation.concurrent.GuardedBy; import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed; @@ -348,7 +345,7 @@ public class MiniCluster { /** * Factory method to instantiate the RPC service. * - * @param config + * @param configuration * The configuration of the mini cluster * @param askTimeout * The default RPC timeout for asynchronous "ask" requests. @@ -360,17 +357,16 @@ public class MiniCluster { * @return The instantiated RPC service */ protected RpcService createRpcService( - Configuration config, + Configuration configuration, Time askTimeout, boolean remoteEnabled, String bindAddress) { ActorSystem actorSystem; if (remoteEnabled) { - Tuple2<String, Object> remoteSettings = new Tuple2<String, Object>(bindAddress, 0); - actorSystem = AkkaUtils.createActorSystem(config, Option.apply(remoteSettings)); + actorSystem = AkkaUtils.createActorSystem(configuration, bindAddress, 0); } else { - actorSystem = AkkaUtils.createLocalActorSystem(config); + actorSystem = AkkaUtils.createLocalActorSystem(configuration); } return new AkkaRpcService(actorSystem, askTimeout); http://git-wip-us.apache.org/repos/asf/flink/blob/7ba580bc/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index 9463bfe..45c7418 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -56,6 +56,23 @@ object AkkaUtils { } /** + * Creates an actor system bound to the given hostname and port. + * + * @param configuration instance containing the user provided configuration values + * @param hostname of the network interface to bind to + * @param port of to bind to + * @return created actor system + */ + def createActorSystem( + configuration: Configuration, + hostname: String, + port: Int) + : ActorSystem = { + + createActorSystem(configuration, Some((hostname, port))) + } + + /** * Creates an actor system. If a listening address is specified, then the actor system will listen * on that address for messages from a remote actor system. If not, then a local actor system * will be instantiated. http://git-wip-us.apache.org/repos/asf/flink/blob/7ba580bc/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java index dd43337..ef53547 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java @@ -31,7 +31,7 @@ import org.junit.Test; */ public class MiniClusterITCase extends TestLogger { -// @Test + @Test public void runJobWithSingleRpcService() throws Exception { MiniClusterConfiguration cfg = new MiniClusterConfiguration();
