diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 597eef129f63e..688376e634f9d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -128,6 +128,12 @@ private[spark] object HiveUtils extends Logging { .toSequence .createWithDefault(jdbcPrefixes) + val HIVE_INPUT_FORMAT_OPTIMIZER_ENABLED = + buildConf("spark.sql.hive.inputFormat.optimizer.enabled") + .doc("When true, enable the optimizer of `fileInputFormat` in Spark SQL.") + .booleanConf + .createWithDefault(false) + private def jdbcPrefixes = Seq( "com.mysql.jdbc", "org.postgresql", "com.microsoft.sqlserver", "oracle.jdbc") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 536bc4a3f4ec4..ffc76bffa5bcd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -123,8 +123,7 @@ class HadoopTableReader( val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt) // logDebug("Table input: %s".format(tablePath)) - val ifc = hiveTable.getInputFormatClass - .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]] + val ifc = getAndOptimizeInput(hiveTable.getInputFormatClass.getName) val hadoopRDD = createHadoopRdd(localTableDesc, inputPathStr, ifc) val attrsWithIndex = attributes.zipWithIndex @@ -164,7 +163,7 @@ class HadoopTableReader( def verifyPartitionPath( partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]]): Map[HivePartition, Class[_ <: Deserializer]] = { - if (!sparkSession.sessionState.conf.verifyPartitionPath) { + if (!conf.verifyPartitionPath) { partitionToDeserializer } else { val existPathSet = collection.mutable.Set[String]() @@ -202,8 +201,7 @@ class HadoopTableReader( val partDesc = Utilities.getPartitionDesc(partition) val partPath = partition.getDataLocation val inputPathStr = applyFilterIfNeeded(partPath, filterOpt) - val ifc = partDesc.getInputFileFormatClass - .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]] + val ifc = getAndOptimizeInput(partDesc.getInputFileFormatClassName) // Get partition field info val partSpec = partDesc.getPartSpec val partProps = partDesc.getProperties @@ -311,6 +309,32 @@ class HadoopTableReader( // Only take the value (skip the key) because Hive works only with values. rdd.map(_._2) } + + /** + * If `spark.sql.hive.fileInputFormat.enabled` is true, this function will optimize the input + * method(including format and the size of splits) while reading Hive tables. + */ + private def getAndOptimizeInput( + inputClassName: String): Class[InputFormat[Writable, Writable]] = { + + var ifc = Utils.classForName(inputClassName) + .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]] + if (conf.getConf(HiveUtils.HIVE_INPUT_FORMAT_OPTIMIZER_ENABLED)) { + if ("org.apache.hadoop.mapreduce.lib.input.TextInputFormat" + .equals(inputClassName)) { + ifc = Utils.classForName( + "org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat") + .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]] + } + if ("org.apache.hadoop.mapred.TextInputFormat" + .equals(inputClassName)) { + ifc = Utils.classForName( + "org.apache.hadoop.mapred.lib.CombineTextInputFormat") + .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]] + } + } + ifc + } } private[hive] object HiveTableUtil { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala index 3f9bb8de42e09..3371bce7087e5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala @@ -192,4 +192,47 @@ class HiveTableScanSuite extends HiveComparisonTest with SQLTestUtils with TestH case p: HiveTableScanExec => p }.get } -} + + test("Test the InputFormat optimizer") { + withTable("table_old", "table_pt_old", "table_new", "table_pt_new") { + sql("set spark.sql.hive.fileInputFormat.enabled=true") + sql("set spark.sql.hive.fileInputFormat.split.maxsize=134217728") + sql("set spark.sql.hive.fileInputFormat.split.minsize=134217728") + sql( + s""" + |CREATE TABLE table_old (id int) + |STORED AS + |INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' + |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' + """.stripMargin) + sql( + s""" + |CREATE TABLE table_pt_old (id int) + |PARTITIONED BY (a int, b int) + |STORED AS + |INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' + |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' + """.stripMargin) + sql( + s""" + |CREATE TABLE table_new (id int) + |STORED AS + |INPUTFORMAT 'org.apache.hadoop.mapreduce.lib.input.TextInputFormat' + |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' + """.stripMargin) + sql( + s""" + |CREATE TABLE table_pt_new (id int) + |PARTITIONED BY (a int, b int) + |STORED AS + |INPUTFORMAT 'org.apache.hadoop.mapreduce.lib.input.TextInputFormat' + |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' + """.stripMargin) + + sql("SELECT count(1) FROM table_old").show() + sql("SELECT count(1) FROM table_pt_old").show() + sql("SELECT count(1) FROM table_new").show() + sql("SELECT count(1) FROM table_pt_new").show() + } + } +} \ No newline at end of file
With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org