This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 940b1d8 [SPARK-29492][SQL] Reset HiveSession's SessionState conf's
ClassLoader when sync mode
940b1d8 is described below
commit 940b1d860310ec7fe5484e73bb8a9d597af7a12a
Author: angerszhu <[email protected]>
AuthorDate: Wed Apr 29 06:48:46 2020 +0000
[SPARK-29492][SQL] Reset HiveSession's SessionState conf's ClassLoader when
sync mode
### What changes were proposed in this pull request?
Run sql in spark thrift server, each session 's thrift server about method
will be called in one thread, but when running query statement, we have two
mode:
1. sync
2. async
https://github.com/apache/spark/blob/5a482e72091c8db940408905e8c044f7f5d7814f/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala#L205-L238
In sync mode, we just submit query in current session's corresponding
thread and wait Spark to running query and return result, and the query method
will always wait for query return.
In async mode, in SparkExecuteStatementOperation, we will submit query in a
backend thread pool, and update operation state, after submitted to backend
thread poll, ExecuteStatement method will return a OperationHandle to client
side, and client side will request operation status continuously. after backend
thread running sql and return , it will update corresponding operation status,
when client got operation status is final status, it will got error or start
fetching result of thi [...]
When we use pyhive connect to SparkThriftServer, it will run statement in
sync mode.
When we query data of hive table , it will check serde class in
HiveTableScanExec#addColumnMetadataToConf
https://github.com/apache/spark/blob/5a482e72091c8db940408905e8c044f7f5d7814f/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala#L123
```
public Class<? extends Deserializer> getDeserializerClass() {
try {
return Class.forName(this.getSerdeClassName(), true,
Utilities.getSessionSpecifiedClassLoader());
} catch (ClassNotFoundException var2) {
throw new RuntimeException(var2);
}
}
public static ClassLoader getSessionSpecifiedClassLoader() {
SessionState state = SessionState.get();
if (state != null && state.getConf() != null) {
ClassLoader sessionCL = state.getConf().getClassLoader();
if (sessionCL != null) {
if (LOG.isTraceEnabled()) {
LOG.trace("Use session specified class loader");
}
return sessionCL;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Session specified class loader not found, use thread
based class loader");
}
return JavaUtils.getClassLoader();
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Hive Conf not found or Session not initiated, use thread
based class loader instead");
}
return JavaUtils.getClassLoader();
}
}
```
Since we run statement in sync mode, it will use HiveSession's
SessionState, and use it's conf's classLoader. then error happened.
```
Current operation state RUNNING_STATE,
java.lang.RuntimeException: java.lang.ClassNotFoundException:
xxx.xxx.xxxJsonSerDe
at
org.apache.hadoop.hive.ql.plan.TableDesc.getDeserializerClass(TableDesc.java:74)
at
org.apache.spark.sql.hive.execution.HiveTableScanExec.addColumnMetadataToConf(HiveTableScanExec.scala:123)
at
org.apache.spark.sql.hive.execution.HiveTableScanExec.hadoopConf$lzycompute(HiveTableScanExec.scala:101)
at
org.apache.spark.sql.hive.execution.HiveTableScanExec.hadoopConf(HiveTableScanExec.scala:98)
at
org.apache.spark.sql.hive.execution.HiveTableScanExec.org$apache$spark$sql$hive$execution$HiveTableScanExec$$hadoopReader$lzycompute(HiveTableScanExec.scala:110)
at
org.apache.spark.sql.hive.execution.HiveTableScanExec.org$apache$spark$sql$hive$execution$HiveTableScanExec$$hadoopReader(HiveTableScanExec.scala:105)
at
org.apache.spark.sql.hive.execution.HiveTableScanExec$$anonfun$11.apply(HiveTableScanExec.scala:192)
at
org.apache.spark.sql.hive.execution.HiveTableScanExec$$anonfun$11.apply(HiveTableScanExec.scala:192)
```
We should reset it when we start run sql in sync mode.
### Why are the changes needed?
Fix bug
### Does this PR introduce any user-facing change?
NO
### How was this patch tested?
UT
Closes #26141 from AngersZhuuuu/add_jar_in_sync_mode.
Lead-authored-by: angerszhu <[email protected]>
Co-authored-by: AngersZhuuuu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 6bc8d84130ae2dcbcd8f5c3361577b19f2eb5d8b)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../SparkExecuteStatementOperation.scala | 5 +++
.../thriftserver/HiveThriftServer2Suites.scala | 40 ++++++++++++++++++++++
2 files changed, 45 insertions(+)
diff --git
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index cf0e5eb..d14d70f 100644
---
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -274,6 +274,11 @@ private[hive] class SparkExecuteStatementOperation(
val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader
Thread.currentThread().setContextClassLoader(executionHiveClassLoader)
+ // Always set the session state classloader to
`executionHiveClassLoader` even for sync mode
+ if (!runInBackground) {
+
parentSession.getSessionState.getConf.setClassLoader(executionHiveClassLoader)
+ }
+
sqlContext.sparkContext.setJobGroup(statementId, statement)
result = sqlContext.sql(statement)
logDebug(result.queryExecution.toString())
diff --git
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index 28f67cb..639dc4d 100644
---
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -771,6 +771,46 @@ class HiveThriftBinaryServerSuite extends
HiveThriftJdbcTest {
client.closeSession(sessionHandle)
}
}
+
+ test("SPARK-29492: use add jar in sync mode") {
+ withCLIServiceClient { client =>
+ val user = System.getProperty("user.name")
+ val sessionHandle = client.openSession(user, "")
+ withJdbcStatement("smallKV", "addJar") { statement =>
+ val confOverlay = new java.util.HashMap[java.lang.String,
java.lang.String]
+ val jarFile = HiveTestJars.getHiveHcatalogCoreJar().getCanonicalPath
+
+ Seq(s"ADD JAR $jarFile",
+ "CREATE TABLE smallKV(key INT, val STRING) USING hive",
+ s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE
smallKV")
+ .foreach(query => client.executeStatement(sessionHandle, query,
confOverlay))
+
+ client.executeStatement(sessionHandle,
+ """CREATE TABLE addJar(key string)
+ |ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
+ """.stripMargin, confOverlay)
+
+ client.executeStatement(sessionHandle,
+ "INSERT INTO TABLE addJar SELECT 'k1' as key FROM smallKV limit 1",
confOverlay)
+
+ val operationHandle = client.executeStatement(
+ sessionHandle,
+ "SELECT key FROM addJar",
+ confOverlay)
+
+ // Fetch result first time
+ assertResult(1, "Fetching result first time from next row") {
+
+ val rows_next = client.fetchResults(
+ operationHandle,
+ FetchOrientation.FETCH_NEXT,
+ 1000,
+ FetchType.QUERY_OUTPUT)
+ rows_next.numRows()
+ }
+ }
+ }
+ }
}
class SingleSessionSuite extends HiveThriftJdbcTest {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]