Repository: carbondata Updated Branches: refs/heads/master 2c095542b -> d1139330f
[CARBONDATA-2422] Search mode Master port should be dynamic In SDV test, sometimes search mode testcase failed because Master port is occupied. This PR adds support for dynamic master port to avoid port binding failure This closes #2256 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d1139330 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d1139330 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d1139330 Branch: refs/heads/master Commit: d1139330f0225950b664b37447eb48ae5662ebb4 Parents: 2c09554 Author: Jacky Li <[email protected]> Authored: Tue May 1 20:25:16 2018 +0800 Committer: ravipesala <[email protected]> Committed: Thu May 3 16:45:47 2018 +0530 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 4 +- .../scala/org/apache/spark/rpc/Master.scala | 39 ++++++++++++++------ 2 files changed, 31 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/d1139330/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 553698a..648f08e 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1666,7 +1666,9 @@ public final class CarbonCommonConstants { public static final String CARBON_SEARCH_MODE_SCAN_THREAD = "carbon.search.scan.thread"; /** - * In search mode, Master will listen on this port for worker registration + * In search mode, Master will listen on this port for worker registration. + * If Master failed to start service with this port, it will try to increment the port number + * and try to bind again, until it is success */ @CarbonProperty @InterfaceStability.Unstable http://git-wip-us.apache.org/repos/asf/carbondata/blob/d1139330/store/search/src/main/scala/org/apache/spark/rpc/Master.scala ---------------------------------------------------------------------- diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala index e98a780..df793b4 100644 --- a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala +++ b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala @@ -18,7 +18,7 @@ package org.apache.spark.rpc import java.io.IOException -import java.net.InetAddress +import java.net.{BindException, InetAddress} import java.util.{List => JList, Map => JMap, Objects, Random, UUID} import scala.collection.JavaConverters._ @@ -49,11 +49,11 @@ import org.apache.carbondata.store.worker.Status /** * Master of CarbonSearch. - * It listens to [[Master.port]] to wait for worker to register. + * It provides a Registry service for worker to register. * And it provides search API to fire RPC call to workers. */ @InterfaceAudience.Internal -class Master(sparkConf: SparkConf, port: Int) { +class Master(sparkConf: SparkConf) { private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName) // worker host address map to EndpointRef @@ -64,22 +64,39 @@ class Master(sparkConf: SparkConf, port: Int) { private val scheduler: Scheduler = new Scheduler - def this(sparkConf: SparkConf) = { - this(sparkConf, CarbonProperties.getSearchMasterPort) - } - /** start service and listen on port passed in constructor */ def startService(): Unit = { if (rpcEnv == null) { new Thread(new Runnable { override def run(): Unit = { val hostAddress = InetAddress.getLocalHost.getHostAddress - val config = RpcEnvConfig( - sparkConf, "registry-service", hostAddress, "", CarbonProperties.getSearchMasterPort, - new SecurityManager(sparkConf), clientMode = false) - rpcEnv = new NettyRpcEnvFactory().create(config) + var port = CarbonProperties.getSearchMasterPort + var exception: BindException = null + var numTry = 100 // we will try to create service at worse case 100 times + do { + try { + LOG.info(s"starting registry-service on $hostAddress:$port") + val config = RpcEnvConfig( + sparkConf, "registry-service", hostAddress, "", port, + new SecurityManager(sparkConf), clientMode = false) + rpcEnv = new NettyRpcEnvFactory().create(config) + numTry = 0 + } catch { + case e: BindException => + // port is occupied, increase the port number and try again + exception = e + LOG.error(s"start registry-service failed: ${e.getMessage}") + port = port + 1 + numTry = numTry - 1 + } + } while (numTry > 0) + if (rpcEnv == null) { + // we have tried many times, but still failed to find an available port + throw exception + } val registryEndpoint: RpcEndpoint = new Registry(rpcEnv, Master.this) rpcEnv.setupEndpoint("registry-service", registryEndpoint) + LOG.info("registry-service started") rpcEnv.awaitTermination() } }).start()
