This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 940b1d8  [SPARK-29492][SQL] Reset HiveSession's SessionState conf's 
ClassLoader when sync mode
940b1d8 is described below

commit 940b1d860310ec7fe5484e73bb8a9d597af7a12a
Author: angerszhu <[email protected]>
AuthorDate: Wed Apr 29 06:48:46 2020 +0000

    [SPARK-29492][SQL] Reset HiveSession's SessionState conf's ClassLoader when 
sync mode
    
    ### What changes were proposed in this pull request?
    Run sql in spark thrift server, each session 's thrift server about method 
will be called in one thread, but when running query statement,  we have two 
mode:
     1. sync
     2. async
     
https://github.com/apache/spark/blob/5a482e72091c8db940408905e8c044f7f5d7814f/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala#L205-L238
    
    In sync mode, we just submit query in current session's corresponding 
thread and wait Spark to running query and return result,  and the query method 
will always wait for query return.
    In async mode, in SparkExecuteStatementOperation, we will submit query in a 
backend thread pool, and update operation state,  after submitted to backend 
thread poll, ExecuteStatement method will return a OperationHandle to client 
side, and client side will request operation status continuously. after backend 
thread running sql and return , it will update corresponding  operation status, 
when client got operation status is final status, it will got error or start 
fetching result of thi [...]
    
    When we use pyhive connect to SparkThriftServer, it will run statement in 
sync mode.
    When we query data of hive table , it will check serde class in 
HiveTableScanExec#addColumnMetadataToConf
    
    
https://github.com/apache/spark/blob/5a482e72091c8db940408905e8c044f7f5d7814f/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala#L123
    
    ```
      public Class<? extends Deserializer> getDeserializerClass() {
        try {
          return Class.forName(this.getSerdeClassName(), true, 
Utilities.getSessionSpecifiedClassLoader());
        } catch (ClassNotFoundException var2) {
          throw new RuntimeException(var2);
        }
      }
    
     public static ClassLoader getSessionSpecifiedClassLoader() {
        SessionState state = SessionState.get();
        if (state != null && state.getConf() != null) {
          ClassLoader sessionCL = state.getConf().getClassLoader();
          if (sessionCL != null) {
            if (LOG.isTraceEnabled()) {
              LOG.trace("Use session specified class loader");
            }
    
            return sessionCL;
          } else {
            if (LOG.isDebugEnabled()) {
              LOG.debug("Session specified class loader not found, use thread 
based class loader");
            }
    
            return JavaUtils.getClassLoader();
          }
        } else {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Hive Conf not found or Session not initiated, use thread 
based class loader instead");
          }
    
          return JavaUtils.getClassLoader();
        }
      }
    ```
    Since we run statement in sync mode, it will use HiveSession's 
SessionState,  and use it's conf's classLoader. then error happened.
    ```
    Current operation state RUNNING_STATE,
    java.lang.RuntimeException: java.lang.ClassNotFoundException:
    xxx.xxx.xxxJsonSerDe
      at 
org.apache.hadoop.hive.ql.plan.TableDesc.getDeserializerClass(TableDesc.java:74)
      at 
org.apache.spark.sql.hive.execution.HiveTableScanExec.addColumnMetadataToConf(HiveTableScanExec.scala:123)
      at 
org.apache.spark.sql.hive.execution.HiveTableScanExec.hadoopConf$lzycompute(HiveTableScanExec.scala:101)
          at 
org.apache.spark.sql.hive.execution.HiveTableScanExec.hadoopConf(HiveTableScanExec.scala:98)
          at 
org.apache.spark.sql.hive.execution.HiveTableScanExec.org$apache$spark$sql$hive$execution$HiveTableScanExec$$hadoopReader$lzycompute(HiveTableScanExec.scala:110)
      at 
org.apache.spark.sql.hive.execution.HiveTableScanExec.org$apache$spark$sql$hive$execution$HiveTableScanExec$$hadoopReader(HiveTableScanExec.scala:105)
          at 
org.apache.spark.sql.hive.execution.HiveTableScanExec$$anonfun$11.apply(HiveTableScanExec.scala:192)
          at 
org.apache.spark.sql.hive.execution.HiveTableScanExec$$anonfun$11.apply(HiveTableScanExec.scala:192)
    ```
    We should reset it when we start run sql in sync mode.
    ### Why are the changes needed?
    Fix bug
    
    ### Does this PR introduce any user-facing change?
    NO
    
    ### How was this patch tested?
    UT
    
    Closes #26141 from AngersZhuuuu/add_jar_in_sync_mode.
    
    Lead-authored-by: angerszhu <[email protected]>
    Co-authored-by: AngersZhuuuu <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit 6bc8d84130ae2dcbcd8f5c3361577b19f2eb5d8b)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../SparkExecuteStatementOperation.scala           |  5 +++
 .../thriftserver/HiveThriftServer2Suites.scala     | 40 ++++++++++++++++++++++
 2 files changed, 45 insertions(+)

diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index cf0e5eb..d14d70f 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -274,6 +274,11 @@ private[hive] class SparkExecuteStatementOperation(
       val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader
       Thread.currentThread().setContextClassLoader(executionHiveClassLoader)
 
+      // Always set the session state classloader to 
`executionHiveClassLoader` even for sync mode
+      if (!runInBackground) {
+        
parentSession.getSessionState.getConf.setClassLoader(executionHiveClassLoader)
+      }
+
       sqlContext.sparkContext.setJobGroup(statementId, statement)
       result = sqlContext.sql(statement)
       logDebug(result.queryExecution.toString())
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index 28f67cb..639dc4d 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -771,6 +771,46 @@ class HiveThriftBinaryServerSuite extends 
HiveThriftJdbcTest {
       client.closeSession(sessionHandle)
     }
   }
+
+  test("SPARK-29492: use add jar in sync mode") {
+    withCLIServiceClient { client =>
+      val user = System.getProperty("user.name")
+      val sessionHandle = client.openSession(user, "")
+      withJdbcStatement("smallKV", "addJar") { statement =>
+        val confOverlay = new java.util.HashMap[java.lang.String, 
java.lang.String]
+        val jarFile = HiveTestJars.getHiveHcatalogCoreJar().getCanonicalPath
+
+        Seq(s"ADD JAR $jarFile",
+          "CREATE TABLE smallKV(key INT, val STRING) USING hive",
+          s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE 
smallKV")
+          .foreach(query => client.executeStatement(sessionHandle, query, 
confOverlay))
+
+        client.executeStatement(sessionHandle,
+          """CREATE TABLE addJar(key string)
+            |ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
+          """.stripMargin, confOverlay)
+
+        client.executeStatement(sessionHandle,
+          "INSERT INTO TABLE addJar SELECT 'k1' as key FROM smallKV limit 1", 
confOverlay)
+
+        val operationHandle = client.executeStatement(
+          sessionHandle,
+          "SELECT key FROM addJar",
+          confOverlay)
+
+        // Fetch result first time
+        assertResult(1, "Fetching result first time from next row") {
+
+          val rows_next = client.fetchResults(
+            operationHandle,
+            FetchOrientation.FETCH_NEXT,
+            1000,
+            FetchType.QUERY_OUTPUT)
+          rows_next.numRows()
+        }
+      }
+    }
+  }
 }
 
 class SingleSessionSuite extends HiveThriftJdbcTest {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to