This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new d2fb6e4 [SPARK-31833][SQL][TEST-HIVE1.2] Set HiveThriftServer2 with actual port while configured 0 d2fb6e4 is described below commit d2fb6e408538e89ac5cc3b3c71925dbaca387bbc Author: Kent Yao <yaooq...@hotmail.com> AuthorDate: Fri May 29 04:13:23 2020 +0000 [SPARK-31833][SQL][TEST-HIVE1.2] Set HiveThriftServer2 with actual port while configured 0 ### What changes were proposed in this pull request? When I was developing some stuff based on the `DeveloperAPI ` `org.apache.spark.sql.hive.thriftserver.HiveThriftServer2#startWithContext`, I need to use thrift port randomly to avoid race on ports. But the `org.apache.hive.service.cli.thrift.ThriftCLIService#getPortNumber` do not respond to me with the actual bound port but always 0. And the server log is not right too, after starting the server, it's hard to form to the right JDBC connection string. ``` INFO ThriftCLIService: Starting ThriftBinaryCLIService on port 0 with 5...500 worker threads ``` Indeed, the `53742` is the right port ```shell lsof -nP -p `cat ./pid/spark-kentyao-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1.pid` | grep LISTEN java 18990 kentyao 288u IPv6 0x22858e3e60d6a0a7 0t0 TCP 10.242.189.214:53723 (LISTEN) java 18990 kentyao 290u IPv6 0x22858e3e60d68827 0t0 TCP *:4040 (LISTEN) java 18990 kentyao 366u IPv6 0x22858e3e60d66987 0t0 TCP 10.242.189.214:53724 (LISTEN) java 18990 kentyao 438u IPv6 0x22858e3e60d65d47 0t0 TCP *:53742 (LISTEN) ``` In the PR, when the port is configured 0, the `portNum` will be set to the real used port during the start process. Also use 0 in thrift related tests to avoid potential flakiness. ### Why are the changes needed? 1 fix API bug 2 reduce test flakiness ### Does this PR introduce _any_ user-facing change? yes, `org.apache.hive.service.cli.thrift.ThriftCLIService#getPortNumber` will always give you the actual port when it is configured 0. ### How was this patch tested? modified unit tests Closes #28651 from yaooqinn/SPARK-31833. Authored-by: Kent Yao <yaooq...@hotmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit fe1da296da152528fc159c23e3026a0c19343780) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../sql/hive/thriftserver/SharedThriftServer.scala | 33 ++++++++++++---------- .../service/cli/thrift/ThriftBinaryCLIService.java | 4 +++ .../service/cli/thrift/ThriftHttpCLIService.java | 3 ++ .../service/cli/thrift/ThriftBinaryCLIService.java | 4 +++ .../service/cli/thrift/ThriftHttpCLIService.java | 3 ++ 5 files changed, 32 insertions(+), 15 deletions(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala index ce61009..e002bc0 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala @@ -19,29 +19,25 @@ package org.apache.spark.sql.hive.thriftserver import java.sql.{DriverManager, Statement} +import scala.collection.JavaConverters._ import scala.concurrent.duration._ -import scala.util.{Random, Try} +import scala.util.Try import org.apache.hadoop.hive.conf.HiveConf.ConfVars +import org.apache.hive.service.cli.thrift.ThriftCLIService -import org.apache.spark.sql.QueryTest import org.apache.spark.sql.test.SharedSparkSession trait SharedThriftServer extends SharedSparkSession { private var hiveServer2: HiveThriftServer2 = _ + private var serverPort: Int = 0 override def beforeAll(): Unit = { super.beforeAll() - // Chooses a random port between 10000 and 19999 - var listeningPort = 10000 + Random.nextInt(10000) - // Retries up to 3 times with different port numbers if the server fails to start - (1 to 3).foldLeft(Try(startThriftServer(listeningPort, 0))) { case (started, attempt) => - started.orElse { - listeningPort += 1 - Try(startThriftServer(listeningPort, attempt)) - } + (1 to 3).foldLeft(Try(startThriftServer(0))) { case (started, attempt) => + started.orElse(Try(startThriftServer(attempt))) }.recover { case cause: Throwable => throw cause @@ -59,8 +55,7 @@ trait SharedThriftServer extends SharedSparkSession { protected def withJdbcStatement(fs: (Statement => Unit)*): Unit = { val user = System.getProperty("user.name") - - val serverPort = hiveServer2.getHiveConf.get(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname) + require(serverPort != 0, "Failed to bind an actual port for HiveThriftServer2") val connections = fs.map { _ => DriverManager.getConnection(s"jdbc:hive2://localhost:$serverPort", user, "") } val statements = connections.map(_.createStatement()) @@ -73,11 +68,19 @@ trait SharedThriftServer extends SharedSparkSession { } } - private def startThriftServer(port: Int, attempt: Int): Unit = { - logInfo(s"Trying to start HiveThriftServer2: port=$port, attempt=$attempt") + private def startThriftServer(attempt: Int): Unit = { + logInfo(s"Trying to start HiveThriftServer2:, attempt=$attempt") val sqlContext = spark.newSession().sqlContext - sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, port.toString) + // Set the HIVE_SERVER2_THRIFT_PORT to 0, so it could randomly pick any free port to use. + // It's much more robust than set a random port generated by ourselves ahead + sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, "0") hiveServer2 = HiveThriftServer2.startWithContext(sqlContext) + hiveServer2.getServices.asScala.foreach { + case t: ThriftCLIService if t.getPortNumber != 0 => + serverPort = t.getPortNumber + logInfo(s"Started HiveThriftServer2: port=$serverPort, attempt=$attempt") + case _ => + } // Wait for thrift server to be ready to serve the query, via executing simple query // till the query succeeds. See SPARK-30345 for more details. diff --git a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java index 21b8bf7..e1ee503 100644 --- a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java +++ b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java @@ -76,6 +76,10 @@ public class ThriftBinaryCLIService extends ThriftCLIService { keyStorePassword, sslVersionBlacklist); } + // In case HIVE_SERVER2_THRIFT_PORT or hive.server2.thrift.port is configured with 0 which + // represents any free port, we should set it to the actual one + portNum = serverSocket.getServerSocket().getLocalPort(); + // Server args int maxMessageSize = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_MAX_MESSAGE_SIZE); int requestTimeout = (int) hiveConf.getTimeVar( diff --git a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java index 504e63d..1099a00 100644 --- a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java +++ b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java @@ -143,6 +143,9 @@ public class ThriftHttpCLIService extends ThriftCLIService { // TODO: check defaults: maxTimeout, keepalive, maxBodySize, bodyRecieveDuration, etc. // Finally, start the server httpServer.start(); + // In case HIVE_SERVER2_THRIFT_HTTP_PORT or hive.server2.thrift.http.port is configured with + // 0 which represents any free port, we should set it to the actual one + portNum = connector.getLocalPort(); String msg = "Started " + ThriftHttpCLIService.class.getSimpleName() + " in " + schemeName + " mode on port " + connector.getLocalPort()+ " path=" + httpPath + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads"; diff --git a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java index fc19c65..a7de9c0 100644 --- a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java +++ b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java @@ -77,6 +77,10 @@ public class ThriftBinaryCLIService extends ThriftCLIService { keyStorePassword, sslVersionBlacklist); } + // In case HIVE_SERVER2_THRIFT_PORT or hive.server2.thrift.port is configured with 0 which + // represents any free port, we should set it to the actual one + portNum = serverSocket.getServerSocket().getLocalPort(); + // Server args int maxMessageSize = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_MAX_MESSAGE_SIZE); int requestTimeout = (int) hiveConf.getTimeVar( diff --git a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java index 08626e7..73d5f84 100644 --- a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java +++ b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java @@ -144,6 +144,9 @@ public class ThriftHttpCLIService extends ThriftCLIService { // TODO: check defaults: maxTimeout, keepalive, maxBodySize, bodyRecieveDuration, etc. // Finally, start the server httpServer.start(); + // In case HIVE_SERVER2_THRIFT_HTTP_PORT or hive.server2.thrift.http.port is configured with + // 0 which represents any free port, we should set it to the actual one + portNum = connector.getLocalPort(); String msg = "Started " + ThriftHttpCLIService.class.getSimpleName() + " in " + schemeName + " mode on port " + portNum + " path=" + httpPath + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads"; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org