This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new f513315 Revert "[SPARK-31926][SQL][TEST-HIVE1.2][TEST-MAVEN] Fix
concurrency issue for ThriftCLIService to getPortNumber"
f513315 is described below
commit f5133152578297def5a1066887103338f4f48c11
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Mon Jun 15 19:09:24 2020 -0700
Revert "[SPARK-31926][SQL][TEST-HIVE1.2][TEST-MAVEN] Fix concurrency issue
for ThriftCLIService to getPortNumber"
This reverts commit cd71fbfbb6c8ecc868de756e6b853235417b65ae.
---
project/SparkBuild.scala | 1 +
.../src/test/resources/log4j.properties | 65 ----------------------
.../sql/hive/thriftserver/SharedThriftServer.scala | 50 +++++------------
.../thriftserver/ThriftServerQueryTestSuite.scala | 3 -
.../ThriftServerWithSparkContextSuite.scala | 11 +---
.../service/cli/thrift/ThriftBinaryCLIService.java | 11 +---
.../hive/service/cli/thrift/ThriftCLIService.java | 3 -
.../service/cli/thrift/ThriftHttpCLIService.java | 21 ++-----
.../service/cli/thrift/ThriftBinaryCLIService.java | 11 +---
.../hive/service/cli/thrift/ThriftCLIService.java | 3 -
.../service/cli/thrift/ThriftHttpCLIService.java | 21 ++-----
11 files changed, 30 insertions(+), 170 deletions(-)
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 6da46bc..50d1527 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -479,6 +479,7 @@ object SparkParallelTestGrouping {
"org.apache.spark.sql.hive.thriftserver.SparkSQLEnvSuite",
"org.apache.spark.sql.hive.thriftserver.ui.ThriftServerPageSuite",
"org.apache.spark.sql.hive.thriftserver.ui.HiveThriftServer2ListenerSuite",
+ "org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextSuite",
"org.apache.spark.sql.kafka010.KafkaDelegationTokenSuite"
)
diff --git a/sql/hive-thriftserver/src/test/resources/log4j.properties
b/sql/hive-thriftserver/src/test/resources/log4j.properties
deleted file mode 100644
index 21975ba8..0000000
--- a/sql/hive-thriftserver/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,65 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Set everything to be logged to the file
hive-thriftserver/target/unit-tests.log
-log4j.rootLogger=DEBUG, CA, FA
-
-#Console Appender
-log4j.appender.CA=org.apache.log4j.ConsoleAppender
-log4j.appender.CA.layout=org.apache.log4j.PatternLayout
-log4j.appender.CA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c: %m%n
-log4j.appender.CA.Threshold = WARN
-
-
-#File Appender
-log4j.appender.FA=org.apache.log4j.FileAppender
-log4j.appender.FA.append=false
-log4j.appender.FA.file=target/unit-tests.log
-log4j.appender.FA.layout=org.apache.log4j.PatternLayout
-log4j.appender.FA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %t %p %c{1}: %m%n
-
-# Set the logger level of File Appender to WARN
-log4j.appender.FA.Threshold = DEBUG
-
-# Some packages are noisy for no good reason.
-log4j.additivity.org.apache.hadoop.hive.serde2.lazy.LazyStruct=false
-log4j.logger.org.apache.hadoop.hive.serde2.lazy.LazyStruct=OFF
-
-log4j.additivity.org.apache.hadoop.hive.metastore.RetryingHMSHandler=false
-log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=OFF
-
-log4j.additivity.hive.log=false
-log4j.logger.hive.log=OFF
-
-log4j.additivity.parquet.hadoop.ParquetRecordReader=false
-log4j.logger.parquet.hadoop.ParquetRecordReader=OFF
-
-log4j.additivity.org.apache.parquet.hadoop.ParquetRecordReader=false
-log4j.logger.org.apache.parquet.hadoop.ParquetRecordReader=OFF
-
-log4j.additivity.org.apache.parquet.hadoop.ParquetOutputCommitter=false
-log4j.logger.org.apache.parquet.hadoop.ParquetOutputCommitter=OFF
-
-log4j.additivity.hive.ql.metadata.Hive=false
-log4j.logger.hive.ql.metadata.Hive=OFF
-
-log4j.additivity.org.apache.hadoop.hive.ql.io.RCFile=false
-log4j.logger.org.apache.hadoop.hive.ql.io.RCFile=ERROR
-
-# Parquet related logging
-log4j.logger.org.apache.parquet.CorruptStatistics=ERROR
-log4j.logger.parquet.CorruptStatistics=ERROR
diff --git
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala
index 1c33abf..e002bc0 100644
---
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala
+++
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala
@@ -24,7 +24,6 @@ import scala.concurrent.duration._
import scala.util.Try
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
-import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hive.service.cli.thrift.ThriftCLIService
import org.apache.spark.sql.test.SharedSparkSession
@@ -34,8 +33,6 @@ trait SharedThriftServer extends SharedSparkSession {
private var hiveServer2: HiveThriftServer2 = _
private var serverPort: Int = 0
- def mode: ServerMode.Value
-
override def beforeAll(): Unit = {
super.beforeAll()
// Retries up to 3 times with different port numbers if the server fails
to start
@@ -53,21 +50,14 @@ trait SharedThriftServer extends SharedSparkSession {
hiveServer2.stop()
} finally {
super.afterAll()
- SessionState.detachSession()
}
}
- protected def jdbcUri: String = if (mode == ServerMode.http) {
-
s"jdbc:hive2://localhost:$serverPort/default;transportMode=http;httpPath=cliservice"
- } else {
- s"jdbc:hive2://localhost:$serverPort/"
- }
-
protected def withJdbcStatement(fs: (Statement => Unit)*): Unit = {
val user = System.getProperty("user.name")
require(serverPort != 0, "Failed to bind an actual port for
HiveThriftServer2")
val connections =
- fs.map { _ => DriverManager.getConnection(jdbcUri, user, "") }
+ fs.map { _ =>
DriverManager.getConnection(s"jdbc:hive2://localhost:$serverPort", user, "") }
val statements = connections.map(_.createStatement())
try {
@@ -79,35 +69,23 @@ trait SharedThriftServer extends SharedSparkSession {
}
private def startThriftServer(attempt: Int): Unit = {
- logInfo(s"Trying to start HiveThriftServer2: mode=$mode, attempt=$attempt")
+ logInfo(s"Trying to start HiveThriftServer2:, attempt=$attempt")
val sqlContext = spark.newSession().sqlContext
- // Set the HIVE_SERVER2_THRIFT_PORT and HIVE_SERVER2_THRIFT_HTTP_PORT to
0, so it could
- // randomly pick any free port to use.
+ // Set the HIVE_SERVER2_THRIFT_PORT to 0, so it could randomly pick any
free port to use.
// It's much more robust than set a random port generated by ourselves
ahead
sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, "0")
- sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT.varname, "0")
- sqlContext.setConf(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname,
mode.toString)
-
- try {
- hiveServer2 = HiveThriftServer2.startWithContext(sqlContext)
- hiveServer2.getServices.asScala.foreach {
- case t: ThriftCLIService =>
- serverPort = t.getPortNumber
- logInfo(s"Started HiveThriftServer2: mode=$mode, port=$serverPort,
attempt=$attempt")
- case _ =>
- }
+ hiveServer2 = HiveThriftServer2.startWithContext(sqlContext)
+ hiveServer2.getServices.asScala.foreach {
+ case t: ThriftCLIService if t.getPortNumber != 0 =>
+ serverPort = t.getPortNumber
+ logInfo(s"Started HiveThriftServer2: port=$serverPort,
attempt=$attempt")
+ case _ =>
+ }
- // Wait for thrift server to be ready to serve the query, via executing
simple query
- // till the query succeeds. See SPARK-30345 for more details.
- eventually(timeout(30.seconds), interval(1.seconds)) {
- withJdbcStatement { _.execute("SELECT 1") }
- }
- } catch {
- case e: Exception =>
- logError("Error start hive server with Context ", e)
- if (hiveServer2 != null) {
- hiveServer2.stop()
- }
+ // Wait for thrift server to be ready to serve the query, via executing
simple query
+ // till the query succeeds. See SPARK-30345 for more details.
+ eventually(timeout(30.seconds), interval(1.seconds)) {
+ withJdbcStatement { _.execute("SELECT 1") }
}
}
}
diff --git
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala
index 553f10a..15cc310 100644
---
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala
+++
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala
@@ -54,9 +54,6 @@ import org.apache.spark.sql.types._
*/
class ThriftServerQueryTestSuite extends SQLQueryTestSuite with
SharedThriftServer {
-
- override def mode: ServerMode.Value = ServerMode.binary
-
override protected def testFile(fileName: String): String = {
val url =
Thread.currentThread().getContextClassLoader.getResource(fileName)
// Copy to avoid URISyntaxException during accessing the resources in
`sql/core`
diff --git
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
index d6420de..3e1fce7 100644
---
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
+++
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.hive.thriftserver
-trait ThriftServerWithSparkContextSuite extends SharedThriftServer {
+class ThriftServerWithSparkContextSuite extends SharedThriftServer {
test("SPARK-29911: Uncache cached tables when session closed") {
val cacheManager = spark.sharedState.cacheManager
@@ -42,12 +42,3 @@ trait ThriftServerWithSparkContextSuite extends
SharedThriftServer {
}
}
}
-
-
-class ThriftServerWithSparkContextInBinarySuite extends
ThriftServerWithSparkContextSuite {
- override def mode: ServerMode.Value = ServerMode.binary
-}
-
-class ThriftServerWithSparkContextInHttpSuite extends
ThriftServerWithSparkContextSuite {
- override def mode: ServerMode.Value = ServerMode.http
-}
diff --git
a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
index 00bdf7e..e1ee503 100644
---
a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
+++
b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
@@ -28,7 +28,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hive.service.ServiceException;
import org.apache.hive.service.auth.HiveAuthFactory;
import org.apache.hive.service.cli.CLIService;
import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
@@ -46,7 +45,7 @@ public class ThriftBinaryCLIService extends ThriftCLIService {
}
@Override
- protected void initializeServer() {
+ public void run() {
try {
// Server thread pool
String threadPoolName = "HiveServer2-Handler-Pool";
@@ -101,14 +100,6 @@ public class ThriftBinaryCLIService extends
ThriftCLIService {
String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName()
+ " on port "
+ serverSocket.getServerSocket().getLocalPort() + " with " +
minWorkerThreads + "..." + maxWorkerThreads + " worker threads";
LOG.info(msg);
- } catch (Exception t) {
- throw new ServiceException("Error initializing " + getName(), t);
- }
- }
-
- @Override
- public void run() {
- try {
server.serve();
} catch (Throwable t) {
LOG.fatal(
diff --git
a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
index 783e579..8fce9d9 100644
---
a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
+++
b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
@@ -175,7 +175,6 @@ public abstract class ThriftCLIService extends
AbstractService implements TCLISe
public synchronized void start() {
super.start();
if (!isStarted && !isEmbedded) {
- initializeServer();
new Thread(this).start();
isStarted = true;
}
@@ -634,8 +633,6 @@ public abstract class ThriftCLIService extends
AbstractService implements TCLISe
return resp;
}
- protected abstract void initializeServer();
-
@Override
public abstract void run();
diff --git
a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
index bd64c77..1099a00 100644
---
a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
+++
b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Shell;
-import org.apache.hive.service.ServiceException;
import org.apache.hive.service.auth.HiveAuthFactory;
import org.apache.hive.service.cli.CLIService;
import org.apache.hive.service.cli.thrift.TCLIService.Iface;
@@ -54,8 +53,13 @@ public class ThriftHttpCLIService extends ThriftCLIService {
super(cliService, ThriftHttpCLIService.class.getSimpleName());
}
+ /**
+ * Configure Jetty to serve http requests. Example of a client connection
URL:
+ * http://localhost:10000/servlets/thrifths2/ A gateway may cause actual
target URL to differ,
+ * e.g. http://gateway:port/hive2/servlets/thrifths2/
+ */
@Override
- protected void initializeServer() {
+ public void run() {
try {
// Server thread pool
// Start with minWorkerThreads, expand till maxWorkerThreads and reject
subsequent requests
@@ -146,19 +150,6 @@ public class ThriftHttpCLIService extends ThriftCLIService
{
+ " mode on port " + connector.getLocalPort()+ " path=" + httpPath +
" with " + minWorkerThreads + "..."
+ maxWorkerThreads + " worker threads";
LOG.info(msg);
- } catch (Exception t) {
- throw new ServiceException("Error initializing " + getName(), t);
- }
- }
-
- /**
- * Configure Jetty to serve http requests. Example of a client connection
URL:
- * http://localhost:10000/servlets/thrifths2/ A gateway may cause actual
target URL to differ,
- * e.g. http://gateway:port/hive2/servlets/thrifths2/
- */
- @Override
- public void run() {
- try {
httpServer.join();
} catch (Throwable t) {
LOG.fatal(
diff --git
a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
index ce79e3c..a7de9c0 100644
---
a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
+++
b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.hive.common.auth.HiveAuthUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hive.service.ServiceException;
import org.apache.hive.service.auth.HiveAuthFactory;
import org.apache.hive.service.cli.CLIService;
import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
@@ -47,7 +46,7 @@ public class ThriftBinaryCLIService extends ThriftCLIService {
}
@Override
- protected void initializeServer() {
+ public void run() {
try {
// Server thread pool
String threadPoolName = "HiveServer2-Handler-Pool";
@@ -102,14 +101,6 @@ public class ThriftBinaryCLIService extends
ThriftCLIService {
String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName()
+ " on port "
+ portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads +
" worker threads";
LOG.info(msg);
- } catch (Exception t) {
- throw new ServiceException("Error initializing " + getName(), t);
- }
- }
-
- @Override
- public void run() {
- try {
server.serve();
} catch (Throwable t) {
LOG.error(
diff --git
a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
index e46799a..d41c3b4 100644
---
a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
+++
b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
@@ -176,7 +176,6 @@ public abstract class ThriftCLIService extends
AbstractService implements TCLISe
public synchronized void start() {
super.start();
if (!isStarted && !isEmbedded) {
- initializeServer();
new Thread(this).start();
isStarted = true;
}
@@ -671,8 +670,6 @@ public abstract class ThriftCLIService extends
AbstractService implements TCLISe
return resp;
}
- protected abstract void initializeServer();
-
@Override
public abstract void run();
diff --git
a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
index ab9ed5b..73d5f84 100644
---
a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
+++
b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Shell;
-import org.apache.hive.service.ServiceException;
import org.apache.hive.service.auth.HiveAuthFactory;
import org.apache.hive.service.cli.CLIService;
import org.apache.hive.service.rpc.thrift.TCLIService;
@@ -55,8 +54,13 @@ public class ThriftHttpCLIService extends ThriftCLIService {
super(cliService, ThriftHttpCLIService.class.getSimpleName());
}
+ /**
+ * Configure Jetty to serve http requests. Example of a client connection
URL:
+ * http://localhost:10000/servlets/thrifths2/ A gateway may cause actual
target URL to differ,
+ * e.g. http://gateway:port/hive2/servlets/thrifths2/
+ */
@Override
- protected void initializeServer() {
+ public void run() {
try {
// Server thread pool
// Start with minWorkerThreads, expand till maxWorkerThreads and reject
subsequent requests
@@ -147,19 +151,6 @@ public class ThriftHttpCLIService extends ThriftCLIService
{
+ " mode on port " + portNum + " path=" + httpPath + " with " +
minWorkerThreads + "..."
+ maxWorkerThreads + " worker threads";
LOG.info(msg);
- } catch (Exception t) {
- throw new ServiceException("Error initializing " + getName(), t);
- }
- }
-
- /**
- * Configure Jetty to serve http requests. Example of a client connection
URL:
- * http://localhost:10000/servlets/thrifths2/ A gateway may cause actual
target URL to differ,
- * e.g. http://gateway:port/hive2/servlets/thrifths2/
- */
- @Override
- public void run() {
- try {
httpServer.join();
} catch (Throwable t) {
LOG.error(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]