Repository: spark
Updated Branches:
  refs/heads/master ec63e2d07 -> f051f8340


[SPARK-13983][SQL] Fix HiveThriftServer2 can not get "--hiveconf" and 
''--hivevar" variables since 2.0

## What changes were proposed in this pull request?

`--hiveconf` and `--hivevar` variables no longer work since Spark 2.0. The 
`spark-sql` client has fixed by 
[SPARK-15730](https://issues.apache.org/jira/browse/SPARK-15730) and 
[SPARK-18086](https://issues.apache.org/jira/browse/SPARK-18086). but 
`beeline`/[`Spark SQL 
HiveThriftServer2`](https://github.com/apache/spark/blob/v2.1.1/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala)
 is still broken. This pull request fix it.

This pull request works for both `JDBC client` and `beeline`.

## How was this patch tested?

unit tests for  `JDBC client`
manual tests for `beeline`:
```
git checkout origin/pr/17886

dev/make-distribution.sh --mvn mvn  --tgz -Phive -Phive-thriftserver 
-Phadoop-2.6 -DskipTests

tar -zxf spark-2.3.0-SNAPSHOT-bin-2.6.5.tgz && cd spark-2.3.0-SNAPSHOT-bin-2.6.5

sbin/start-thriftserver.sh
```
```
cat <<EOF > test.sql
select '\${a}', '\${b}';
EOF

beeline -u jdbc:hive2://localhost:10000 --hiveconf a=avalue --hivevar b=bvalue 
-f test.sql

```

Author: Yuming Wang <wgy...@gmail.com>

Closes #17886 from wangyum/SPARK-13983-dev.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f051f834
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f051f834
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f051f834

Branch: refs/heads/master
Commit: f051f834036e63d5e480d86440ce39924f979e82
Parents: ec63e2d
Author: Yuming Wang <wgy...@gmail.com>
Authored: Thu Feb 1 10:36:31 2018 -0800
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Thu Feb 1 10:36:31 2018 -0800

----------------------------------------------------------------------
 .../service/cli/session/HiveSessionImpl.java    | 74 +++++++++++++++++++-
 .../server/SparkSQLOperationManager.scala       | 12 ++++
 .../thriftserver/HiveThriftServer2Suites.scala  | 23 +++++-
 3 files changed, 105 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f051f834/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index 108074c..fc818bc 100644
--- 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -44,7 +44,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.history.HiveHistory;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.processors.SetProcessor;
+import org.apache.hadoop.hive.ql.parse.VariableSubstitution;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hive.common.util.HiveVersionInfo;
@@ -71,6 +71,12 @@ import 
org.apache.hive.service.cli.operation.OperationManager;
 import org.apache.hive.service.cli.thrift.TProtocolVersion;
 import org.apache.hive.service.server.ThreadWithGarbageCleanup;
 
+import static org.apache.hadoop.hive.conf.SystemVariables.ENV_PREFIX;
+import static org.apache.hadoop.hive.conf.SystemVariables.HIVECONF_PREFIX;
+import static org.apache.hadoop.hive.conf.SystemVariables.HIVEVAR_PREFIX;
+import static org.apache.hadoop.hive.conf.SystemVariables.METACONF_PREFIX;
+import static org.apache.hadoop.hive.conf.SystemVariables.SYSTEM_PREFIX;
+
 /**
  * HiveSession
  *
@@ -209,7 +215,7 @@ public class HiveSessionImpl implements HiveSession {
       String key = entry.getKey();
       if (key.startsWith("set:")) {
         try {
-          SetProcessor.setVariable(key.substring(4), entry.getValue());
+          setVariable(key.substring(4), entry.getValue());
         } catch (Exception e) {
           throw new HiveSQLException(e);
         }
@@ -221,6 +227,70 @@ public class HiveSessionImpl implements HiveSession {
     }
   }
 
+  // Copy from org.apache.hadoop.hive.ql.processors.SetProcessor, only change:
+  // setConf(varname, propName, varvalue, true) when 
varname.startsWith(HIVECONF_PREFIX)
+  public static int setVariable(String varname, String varvalue) throws 
Exception {
+    SessionState ss = SessionState.get();
+    if (varvalue.contains("\n")){
+      ss.err.println("Warning: Value had a \\n character in it.");
+    }
+    varname = varname.trim();
+    if (varname.startsWith(ENV_PREFIX)){
+      ss.err.println("env:* variables can not be set.");
+      return 1;
+    } else if (varname.startsWith(SYSTEM_PREFIX)){
+      String propName = varname.substring(SYSTEM_PREFIX.length());
+      System.getProperties().setProperty(propName,
+              new VariableSubstitution().substitute(ss.getConf(),varvalue));
+    } else if (varname.startsWith(HIVECONF_PREFIX)){
+      String propName = varname.substring(HIVECONF_PREFIX.length());
+      setConf(varname, propName, varvalue, true);
+    } else if (varname.startsWith(HIVEVAR_PREFIX)) {
+      String propName = varname.substring(HIVEVAR_PREFIX.length());
+      ss.getHiveVariables().put(propName,
+              new VariableSubstitution().substitute(ss.getConf(),varvalue));
+    } else if (varname.startsWith(METACONF_PREFIX)) {
+      String propName = varname.substring(METACONF_PREFIX.length());
+      Hive hive = Hive.get(ss.getConf());
+      hive.setMetaConf(propName, new 
VariableSubstitution().substitute(ss.getConf(), varvalue));
+    } else {
+      setConf(varname, varname, varvalue, true);
+    }
+    return 0;
+  }
+
+  // returns non-null string for validation fail
+  private static void setConf(String varname, String key, String varvalue, 
boolean register)
+          throws IllegalArgumentException {
+    HiveConf conf = SessionState.get().getConf();
+    String value = new VariableSubstitution().substitute(conf, varvalue);
+    if (conf.getBoolVar(HiveConf.ConfVars.HIVECONFVALIDATION)) {
+      HiveConf.ConfVars confVars = HiveConf.getConfVars(key);
+      if (confVars != null) {
+        if (!confVars.isType(value)) {
+          StringBuilder message = new StringBuilder();
+          message.append("'SET ").append(varname).append('=').append(varvalue);
+          message.append("' FAILED because ").append(key).append(" expects ");
+          message.append(confVars.typeString()).append(" type value.");
+          throw new IllegalArgumentException(message.toString());
+        }
+        String fail = confVars.validate(value);
+        if (fail != null) {
+          StringBuilder message = new StringBuilder();
+          message.append("'SET ").append(varname).append('=').append(varvalue);
+          message.append("' FAILED in validation : ").append(fail).append('.');
+          throw new IllegalArgumentException(message.toString());
+        }
+      } else if (key.startsWith("hive.")) {
+        throw new IllegalArgumentException("hive configuration " + key + " 
does not exists.");
+      }
+    }
+    conf.verifyAndSet(key, value);
+    if (register) {
+      SessionState.get().getOverriddenConfigurations().put(key, value);
+    }
+  }
+
   @Override
   public void setOperationLogSessionDir(File operationLogRootDir) {
     if (!operationLogRootDir.exists()) {

http://git-wip-us.apache.org/repos/asf/spark/blob/f051f834/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
index a0e5012..bf7c01f 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
@@ -28,6 +28,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.hive.HiveUtils
 import org.apache.spark.sql.hive.thriftserver.{ReflectionUtils, 
SparkExecuteStatementOperation}
+import org.apache.spark.sql.internal.SQLConf
 
 /**
  * Executes queries using Spark SQL, and maintains a list of handles to active 
queries.
@@ -50,6 +51,9 @@ private[thriftserver] class SparkSQLOperationManager()
     require(sqlContext != null, s"Session handle: 
${parentSession.getSessionHandle} has not been" +
       s" initialized or had already closed.")
     val conf = sqlContext.sessionState.conf
+    val hiveSessionState = parentSession.getSessionState
+    setConfMap(conf, hiveSessionState.getOverriddenConfigurations)
+    setConfMap(conf, hiveSessionState.getHiveVariables)
     val runInBackground = async && 
conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
     val operation = new SparkExecuteStatementOperation(parentSession, 
statement, confOverlay,
       runInBackground)(sqlContext, sessionToActivePool)
@@ -58,4 +62,12 @@ private[thriftserver] class SparkSQLOperationManager()
       s"runInBackground=$runInBackground")
     operation
   }
+
+  def setConfMap(conf: SQLConf, confMap: java.util.Map[String, String]): Unit 
= {
+    val iterator = confMap.entrySet().iterator()
+    while (iterator.hasNext) {
+      val kv = iterator.next()
+      conf.setConfString(kv.getKey, kv.getValue)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f051f834/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index 7289da7..496f8c8 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -135,6 +135,22 @@ class HiveThriftBinaryServerSuite extends 
HiveThriftJdbcTest {
     }
   }
 
+  test("Support beeline --hiveconf and --hivevar") {
+    withJdbcStatement() { statement =>
+      executeTest(hiveConfList)
+      executeTest(hiveVarList)
+      def executeTest(hiveList: String): Unit = {
+        hiveList.split(";").foreach{ m =>
+          val kv = m.split("=")
+          // select "${a}"; ---> avalue
+          val resultSet = statement.executeQuery("select \"${" + kv(0) + "}\"")
+          resultSet.next()
+          assert(resultSet.getString(1) === kv(1))
+        }
+      }
+    }
+  }
+
   test("JDBC query execution") {
     withJdbcStatement("test") { statement =>
       val queries = Seq(
@@ -740,10 +756,11 @@ abstract class HiveThriftJdbcTest extends 
HiveThriftServer2Test {
     s"""jdbc:hive2://localhost:$serverPort/
        |default?
        |hive.server2.transport.mode=http;
-       |hive.server2.thrift.http.path=cliservice
+       |hive.server2.thrift.http.path=cliservice;
+       |${hiveConfList}#${hiveVarList}
      """.stripMargin.split("\n").mkString.trim
   } else {
-    s"jdbc:hive2://localhost:$serverPort/"
+    s"jdbc:hive2://localhost:$serverPort/?${hiveConfList}#${hiveVarList}"
   }
 
   def withMultipleConnectionJdbcStatement(tableNames: String*)(fs: (Statement 
=> Unit)*) {
@@ -779,6 +796,8 @@ abstract class HiveThriftServer2Test extends SparkFunSuite 
with BeforeAndAfterAl
   private var listeningPort: Int = _
   protected def serverPort: Int = listeningPort
 
+  protected val hiveConfList = "a=avalue;b=bvalue"
+  protected val hiveVarList = "c=cvalue;d=dvalue"
   protected def user = System.getProperty("user.name")
 
   protected var warehousePath: File = _


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to