Repository: flink Updated Branches: refs/heads/flip-6 4f891a6c2 -> 6dc228fcf
[hotfix] [cluster management] Remove scala dependencies from MiniCluster.java Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f90e5ff2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f90e5ff2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f90e5ff2 Branch: refs/heads/flip-6 Commit: f90e5ff2deaf5eea350ca4a57d8cf77ceb4703b7 Parents: 4f891a6 Author: Till Rohrmann <trohrm...@apache.org> Authored: Mon Oct 17 14:41:52 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Mon Oct 17 15:51:24 2016 +0200 ---------------------------------------------------------------------- .../flink/runtime/minicluster/MiniCluster.java | 12 ++++-------- .../org/apache/flink/runtime/akka/AkkaUtils.scala | 17 +++++++++++++++++ .../runtime/minicluster/MiniClusterITCase.java | 2 +- 3 files changed, 22 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f90e5ff2/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 1ee38e0..d85234d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -35,9 +35,6 @@ import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.util.ExceptionUtils; -import scala.Option; -import scala.Tuple2; - import javax.annotation.concurrent.GuardedBy; import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed; @@ -348,7 +345,7 @@ public class MiniCluster { /** * Factory method to instantiate the RPC service. * - * @param config + * @param configuration * The configuration of the mini cluster * @param askTimeout * The default RPC timeout for asynchronous "ask" requests. @@ -360,17 +357,16 @@ public class MiniCluster { * @return The instantiated RPC service */ protected RpcService createRpcService( - Configuration config, + Configuration configuration, Time askTimeout, boolean remoteEnabled, String bindAddress) { ActorSystem actorSystem; if (remoteEnabled) { - Tuple2<String, Object> remoteSettings = new Tuple2<String, Object>(bindAddress, 0); - actorSystem = AkkaUtils.createActorSystem(config, Option.apply(remoteSettings)); + actorSystem = AkkaUtils.createActorSystem(configuration, bindAddress, 0); } else { - actorSystem = AkkaUtils.createLocalActorSystem(config); + actorSystem = AkkaUtils.createLocalActorSystem(configuration); } return new AkkaRpcService(actorSystem, askTimeout); http://git-wip-us.apache.org/repos/asf/flink/blob/f90e5ff2/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index 2461340..d98d90a 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -55,6 +55,23 @@ object AkkaUtils { } /** + * Creates an actor system bound to the given hostname and port. + * + * @param configuration instance containing the user provided configuration values + * @param hostname of the network interface to bind to + * @param port of to bind to + * @return created actor system + */ + def createActorSystem( + configuration: Configuration, + hostname: String, + port: Int) + : ActorSystem = { + + createActorSystem(configuration, Some((hostname, port))) + } + + /** * Creates an actor system. If a listening address is specified, then the actor system will listen * on that address for messages from a remote actor system. If not, then a local actor system * will be instantiated. http://git-wip-us.apache.org/repos/asf/flink/blob/f90e5ff2/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java index dd43337..ef53547 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java @@ -31,7 +31,7 @@ import org.junit.Test; */ public class MiniClusterITCase extends TestLogger { -// @Test + @Test public void runJobWithSingleRpcService() throws Exception { MiniClusterConfiguration cfg = new MiniClusterConfiguration();