Repository: spark Updated Branches: refs/heads/branch-2.1 4af82d56f -> 29f59c733
[SPARK-18086] Add support for Hive session vars. ## What changes were proposed in this pull request? This adds support for Hive variables: * Makes values set via `spark-sql --hivevar name=value` accessible * Adds `getHiveVar` and `setHiveVar` to the `HiveClient` interface * Adds a SessionVariables trait for sessions like Hive that support variables (including Hive vars) * Adds SessionVariables support to variable substitution * Adds SessionVariables support to the SET command ## How was this patch tested? * Adds a test to all supported Hive versions for accessing Hive variables * Adds HiveVariableSubstitutionSuite Author: Ryan Blue <b...@apache.org> Closes #15738 from rdblue/SPARK-18086-add-hivevar-support. (cherry picked from commit 9b0593d5e99bb919c4abb8d0836a126ec2eaf1d5) Signed-off-by: Reynold Xin <r...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/29f59c73 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/29f59c73 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/29f59c73 Branch: refs/heads/branch-2.1 Commit: 29f59c73301628fb63086660f64fdb5272a312fe Parents: 4af82d5 Author: Ryan Blue <b...@apache.org> Authored: Mon Nov 7 17:36:15 2016 -0800 Committer: Reynold Xin <r...@databricks.com> Committed: Mon Nov 7 17:36:22 2016 -0800 ---------------------------------------------------------------------- .../sql/execution/command/SetCommand.scala | 11 +++++ .../sql/internal/VariableSubstitution.scala | 5 +- .../hive/thriftserver/SparkSQLCLIDriver.scala | 6 ++- .../hive/HiveVariableSubstitutionSuite.scala | 50 ++++++++++++++++++++ 4 files changed, 67 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/29f59c73/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala index af6def5..dc8d975 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala @@ -60,6 +60,13 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm } (keyValueOutput, runFunc) + case Some((key @ SetCommand.VariableName(name), Some(value))) => + val runFunc = (sparkSession: SparkSession) => { + sparkSession.conf.set(name, value) + Seq(Row(key, value)) + } + (keyValueOutput, runFunc) + // Configures a single property. case Some((key, Some(value))) => val runFunc = (sparkSession: SparkSession) => { @@ -117,6 +124,10 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm } +object SetCommand { + val VariableName = """hivevar:([^=]+)""".r +} + /** * This command is for resetting SQLConf to the default values. Command that runs * {{{ http://git-wip-us.apache.org/repos/asf/spark/blob/29f59c73/sql/core/src/main/scala/org/apache/spark/sql/internal/VariableSubstitution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/VariableSubstitution.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/VariableSubstitution.scala index 50725a0..791a9cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/VariableSubstitution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/VariableSubstitution.scala @@ -17,10 +17,7 @@ package org.apache.spark.sql.internal -import java.util.regex.Pattern - import org.apache.spark.internal.config._ -import org.apache.spark.sql.AnalysisException /** * A helper class that enables substitution using syntax like @@ -37,6 +34,7 @@ class VariableSubstitution(conf: SQLConf) { private val reader = new ConfigReader(provider) .bind("spark", provider) .bind("sparkconf", provider) + .bind("hivevar", provider) .bind("hiveconf", provider) /** @@ -49,5 +47,4 @@ class VariableSubstitution(conf: SQLConf) { input } } - } http://git-wip-us.apache.org/repos/asf/spark/blob/29f59c73/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala ---------------------------------------------------------------------- diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 5dafec1..0c79b6f 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -38,7 +38,7 @@ import org.apache.thrift.transport.TSocket import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.hive.HiveUtils +import org.apache.spark.sql.hive.{HiveSessionState, HiveUtils} import org.apache.spark.util.ShutdownHookManager /** @@ -291,6 +291,10 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging { throw new RuntimeException("Remote operations not supported") } + override def setHiveVariables(hiveVariables: java.util.Map[String, String]): Unit = { + hiveVariables.asScala.foreach(kv => SparkSQLEnv.sqlContext.conf.setConfString(kv._1, kv._2)) + } + override def processCmd(cmd: String): Int = { val cmd_trimmed: String = cmd.trim() val cmd_lower = cmd_trimmed.toLowerCase(Locale.ENGLISH) http://git-wip-us.apache.org/repos/asf/spark/blob/29f59c73/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveVariableSubstitutionSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveVariableSubstitutionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveVariableSubstitutionSuite.scala new file mode 100644 index 0000000..84d3946 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveVariableSubstitutionSuite.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.hive.test.TestHiveSingleton + +class HiveVariableSubstitutionSuite extends QueryTest with TestHiveSingleton { + test("SET hivevar with prefix") { + spark.sql("SET hivevar:county=gram") + assert(spark.conf.getOption("county") === Some("gram")) + } + + test("SET hivevar with dotted name") { + spark.sql("SET hivevar:eloquent.mosquito.alphabet=zip") + assert(spark.conf.getOption("eloquent.mosquito.alphabet") === Some("zip")) + } + + test("hivevar substitution") { + spark.conf.set("pond", "bus") + checkAnswer(spark.sql("SELECT '${hivevar:pond}'"), Row("bus") :: Nil) + } + + test("variable substitution without a prefix") { + spark.sql("SET hivevar:flask=plaid") + checkAnswer(spark.sql("SELECT '${flask}'"), Row("plaid") :: Nil) + } + + test("variable substitution precedence") { + spark.conf.set("turn.aloof", "questionable") + spark.sql("SET hivevar:turn.aloof=dime") + // hivevar clobbers the conf setting + checkAnswer(spark.sql("SELECT '${turn.aloof}'"), Row("dime") :: Nil) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org