Repository: spark Updated Branches: refs/heads/master 08dc89361 -> ea017b557
[SPARK-14949][SQL] Remove HiveConf dependency from InsertIntoHiveTable ## What changes were proposed in this pull request? This patch removes the use of HiveConf from InsertIntoHiveTable. I think this is the last major use of HiveConf and after this we can try to remove the execution HiveConf. ## How was this patch tested? Internal refactoring and should be covered by existing tests. Author: Reynold Xin <[email protected]> Closes #12728 from rxin/SPARK-14949. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ea017b55 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ea017b55 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ea017b55 Branch: refs/heads/master Commit: ea017b5574cd26b90f586a3f5bf8ccab3fe02e4b Parents: 08dc893 Author: Reynold Xin <[email protected]> Authored: Wed Apr 27 09:30:57 2016 -0700 Committer: Reynold Xin <[email protected]> Committed: Wed Apr 27 09:30:57 2016 -0700 ---------------------------------------------------------------------- .../hive/execution/InsertIntoHiveTable.scala | 30 +++++++++----------- 1 file changed, 14 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ea017b55/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 3cb6081..cba10ca 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -21,8 +21,6 @@ import java.util import scala.collection.JavaConverters._ -import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.ql.{Context, ErrorMsg} import org.apache.hadoop.mapred.{FileOutputFormat, JobConf} @@ -35,7 +33,7 @@ import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.SparkException import org.apache.spark.util.SerializableJobConf -private[hive] + case class InsertIntoHiveTable( table: MetastoreRelation, partition: Map[String, Option[String]], @@ -45,8 +43,6 @@ case class InsertIntoHiveTable( @transient private val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] @transient private val client = sessionState.metadataHive - @transient private val hiveconf = sessionState.hiveconf - @transient private lazy val hiveContext = new Context(hiveconf) def output: Seq[Attribute] = Seq.empty @@ -70,7 +66,6 @@ case class InsertIntoHiveTable( writerContainer.driverSideSetup() sqlContext.sparkContext.runJob(rdd, writerContainer.writeToFile _) writerContainer.commitJob() - } /** @@ -85,19 +80,20 @@ case class InsertIntoHiveTable( // instances within the closure, since Serializer is not serializable while TableDesc is. val tableDesc = table.tableDesc val tableLocation = table.hiveQlTable.getDataLocation - val tmpLocation = hiveContext.getExternalTmpPath(tableLocation) + val hadoopConf = sessionState.newHadoopConf() + val tmpLocation = new Context(hadoopConf).getExternalTmpPath(tableLocation) val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) - val isCompressed = hiveconf.getBoolean( - ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal) + val isCompressed = + sessionState.conf.getConfString("hive.exec.compress.output", "false").toBoolean if (isCompressed) { // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec", // and "mapred.output.compression.type" have no impact on ORC because it uses table properties // to store compression information. - hiveconf.set("mapred.output.compress", "true") + hadoopConf.set("mapred.output.compress", "true") fileSinkConf.setCompressed(true) - fileSinkConf.setCompressCodec(hiveconf.get("mapred.output.compression.codec")) - fileSinkConf.setCompressType(hiveconf.get("mapred.output.compression.type")) + fileSinkConf.setCompressCodec(hadoopConf.get("mapred.output.compression.codec")) + fileSinkConf.setCompressType(hadoopConf.get("mapred.output.compression.type")) } val numDynamicPartitions = partition.values.count(_.isEmpty) @@ -114,13 +110,15 @@ case class InsertIntoHiveTable( // Validate partition spec if there exist any dynamic partitions if (numDynamicPartitions > 0) { // Report error if dynamic partitioning is not enabled - if (!hiveconf.getBoolVar(HiveConf.ConfVars.DYNAMICPARTITIONING)) { + if (!sessionState.conf.getConfString("hive.exec.dynamic.partition", "true").toBoolean) { throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg) } // Report error if dynamic partition strict mode is on but no static partition is found - if (numStaticPartitions == 0 && hiveconf.getVar( - HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) { + if (numStaticPartitions == 0 && + sessionState.conf.getConfString( + "hive.exec.dynamic.partition.mode", "strict").equalsIgnoreCase("strict")) + { throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg) } @@ -131,7 +129,7 @@ case class InsertIntoHiveTable( } } - val jobConf = new JobConf(hiveconf) + val jobConf = new JobConf(hadoopConf) val jobConfSer = new SerializableJobConf(jobConf) // When speculation is on and output committer class name contains "Direct", we should warn --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
