This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e71acd9  [SPARK-26630][SQL] Support reading Hive-serde tables whose 
INPUTFORMAT is org.apache.hadoop.mapreduce
e71acd9 is described below

commit e71acd9a23926e6c84e462009f145a80ea24bf85
Author: heguozi <zyzzx...@gmail.com>
AuthorDate: Sat Jan 26 10:17:03 2019 -0800

    [SPARK-26630][SQL] Support reading Hive-serde tables whose INPUTFORMAT is 
org.apache.hadoop.mapreduce
    
    ## What changes were proposed in this pull request?
    
    When we read a hive table and create RDDs in `TableReader`, it'll throw 
exception `java.lang.ClassCastException: 
org.apache.hadoop.mapreduce.lib.input.TextInputFormat cannot be cast to 
org.apache.hadoop.mapred.InputFormat` if the input format class of the table is 
from mapreduce package.
    
    Now we use NewHadoopRDD to deal with the new input format and keep 
HadoopRDD to the old one.
    
    This PR is from #23506. We can reproduce this issue by executing the new 
test with the code in old version. When create a table with 
`org.apache.hadoop.mapreduce.....` input format, we will find the exception 
thrown in `org.apache.spark.rdd.HadoopRDD.getInputFormat(HadoopRDD.scala:190)`
    
    ## How was this patch tested?
    
    Added a new test.
    
    Closes #23559 from Deegue/fix-hadoopRDD.
    
    Lead-authored-by: heguozi <zyzzx...@gmail.com>
    Co-authored-by: Yizhong Zhang <zyzzx...@163.com>
    Signed-off-by: gatorsmile <gatorsm...@gmail.com>
---
 .../org/apache/spark/sql/hive/TableReader.scala    |  61 +++++++++---
 .../spark/sql/hive/execution/HiveDDLSuite.scala    | 102 +++++++++++++++++++++
 2 files changed, 149 insertions(+), 14 deletions(-)

diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index 536bc4a..ad11719 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -31,12 +31,13 @@ import org.apache.hadoop.hive.serde2.Deserializer
 import 
org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, 
StructObjectInspector}
 import org.apache.hadoop.hive.serde2.objectinspector.primitive._
 import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf}
+import org.apache.hadoop.mapred.{FileInputFormat, InputFormat => 
oldInputClass, JobConf}
+import org.apache.hadoop.mapreduce.{InputFormat => newInputClass}
 
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD}
+import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD, UnionRDD}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.CastSupport
@@ -123,9 +124,7 @@ class HadoopTableReader(
     val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt)
 
     // logDebug("Table input: %s".format(tablePath))
-    val ifc = hiveTable.getInputFormatClass
-      .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
-    val hadoopRDD = createHadoopRdd(localTableDesc, inputPathStr, ifc)
+    val hadoopRDD = createHadoopRDD(localTableDesc, inputPathStr)
 
     val attrsWithIndex = attributes.zipWithIndex
     val mutableRow = new SpecificInternalRow(attributes.map(_.dataType))
@@ -164,7 +163,7 @@ class HadoopTableReader(
     def verifyPartitionPath(
         partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]]):
         Map[HivePartition, Class[_ <: Deserializer]] = {
-      if (!sparkSession.sessionState.conf.verifyPartitionPath) {
+      if (!conf.verifyPartitionPath) {
         partitionToDeserializer
       } else {
         val existPathSet = collection.mutable.Set[String]()
@@ -202,8 +201,6 @@ class HadoopTableReader(
       val partDesc = Utilities.getPartitionDesc(partition)
       val partPath = partition.getDataLocation
       val inputPathStr = applyFilterIfNeeded(partPath, filterOpt)
-      val ifc = partDesc.getInputFileFormatClass
-        .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
       // Get partition field info
       val partSpec = partDesc.getPartSpec
       val partProps = partDesc.getProperties
@@ -243,7 +240,7 @@ class HadoopTableReader(
 
       // Create local references so that the outer object isn't serialized.
       val localTableDesc = tableDesc
-      createHadoopRdd(localTableDesc, inputPathStr, ifc).mapPartitions { iter 
=>
+      createHadoopRDD(localTableDesc, inputPathStr).mapPartitions { iter =>
         val hconf = broadcastedHiveConf.value.value
         val deserializer = localDeserializer.getConstructor().newInstance()
         // SPARK-13709: For SerDes like AvroSerDe, some essential information 
(e.g. Avro schema
@@ -289,15 +286,28 @@ class HadoopTableReader(
   }
 
   /**
+   * The entry of creating a RDD.
+   * [SPARK-26630] Using which HadoopRDD will be decided by the input format 
of tables.
+   * The input format of NewHadoopRDD is from `org.apache.hadoop.mapreduce` 
package while
+   * the input format of HadoopRDD is from `org.apache.hadoop.mapred` package.
+   */
+  private def createHadoopRDD(localTableDesc: TableDesc, inputPathStr: 
String): RDD[Writable] = {
+    val inputFormatClazz = localTableDesc.getInputFileFormatClass
+    if (classOf[newInputClass[_, _]].isAssignableFrom(inputFormatClazz)) {
+      createNewHadoopRDD(localTableDesc, inputPathStr)
+    } else {
+      createOldHadoopRDD(localTableDesc, inputPathStr)
+    }
+  }
+
+  /**
    * Creates a HadoopRDD based on the broadcasted HiveConf and other job 
properties that will be
    * applied locally on each slave.
    */
-  private def createHadoopRdd(
-    tableDesc: TableDesc,
-    path: String,
-    inputFormatClass: Class[InputFormat[Writable, Writable]]): RDD[Writable] = 
{
-
+  private def createOldHadoopRDD(tableDesc: TableDesc, path: String): 
RDD[Writable] = {
     val initializeJobConfFunc = 
HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _
+    val inputFormatClass = tableDesc.getInputFileFormatClass
+      .asInstanceOf[Class[oldInputClass[Writable, Writable]]]
 
     val rdd = new HadoopRDD(
       sparkSession.sparkContext,
@@ -311,6 +321,29 @@ class HadoopTableReader(
     // Only take the value (skip the key) because Hive works only with values.
     rdd.map(_._2)
   }
+
+  /**
+   * Creates a NewHadoopRDD based on the broadcasted HiveConf and other job 
properties that will be
+   * applied locally on each slave.
+   */
+  private def createNewHadoopRDD(tableDesc: TableDesc, path: String): 
RDD[Writable] = {
+    val newJobConf = new JobConf(hadoopConf)
+    HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc)(newJobConf)
+    val inputFormatClass = tableDesc.getInputFileFormatClass
+      .asInstanceOf[Class[newInputClass[Writable, Writable]]]
+
+    val rdd = new NewHadoopRDD(
+      sparkSession.sparkContext,
+      inputFormatClass,
+      classOf[Writable],
+      classOf[Writable],
+      newJobConf
+    )
+
+    // Only take the value (skip the key) because Hive works only with values.
+    rdd.map(_._2)
+  }
+
 }
 
 private[hive] object HiveTableUtil {
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 6abdc40..5e97a05 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -260,6 +260,108 @@ class HiveCatalogedDDLSuite extends DDLSuite with 
TestHiveSingleton with BeforeA
       assert(err.contains("Cannot recognize hive type string:"))
    }
   }
+
+  test("SPARK-26630: table with old input format and without partitioned will 
use HadoopRDD") {
+    withTable("table_old", "table_ctas_old") {
+      sql(
+        """
+          |CREATE TABLE table_old (col1 LONG, col2 STRING, col3 DOUBLE, col4 
BOOLEAN)
+          |STORED AS
+          |INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
+          |OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
+        """.stripMargin)
+      sql(
+        """
+          |INSERT INTO table_old
+          |VALUES (2147483648, 'AAA', 3.14, false), (2147483649, 'BBB', 3.142, 
true)
+        """.stripMargin)
+      checkAnswer(
+        sql("SELECT col1, col2, col3, col4 FROM table_old"),
+        Row(2147483648L, "AAA", 3.14, false) :: Row(2147483649L, "BBB", 3.142, 
true) :: Nil)
+
+      sql("CREATE TABLE table_ctas_old AS SELECT col1, col2, col3, col4 FROM 
table_old")
+      checkAnswer(
+        sql("SELECT col1, col2, col3, col4 from table_ctas_old"),
+        Row(2147483648L, "AAA", 3.14, false) :: Row(2147483649L, "BBB", 3.142, 
true) :: Nil)
+    }
+  }
+
+  test("SPARK-26630: table with old input format and partitioned will use 
HadoopRDD") {
+    withTable("table_pt_old", "table_ctas_pt_old") {
+      sql(
+        """
+          |CREATE TABLE table_pt_old (col1 LONG, col2 STRING, col3 DOUBLE, 
col4 BOOLEAN)
+          |PARTITIONED BY (pt INT)
+          |STORED AS
+          |INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
+          |OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
+        """.stripMargin)
+      sql(
+        """
+          |INSERT INTO table_pt_old PARTITION (pt = 1)
+          |VALUES (2147483648, 'AAA', 3.14, false), (2147483649, 'BBB', 3.142, 
true)
+        """.stripMargin)
+      checkAnswer(
+        sql("SELECT col1, col2, col3, col4 FROM table_pt_old WHERE pt = 1"),
+        Row(2147483648L, "AAA", 3.14, false) :: Row(2147483649L, "BBB", 3.142, 
true) :: Nil)
+
+      sql("CREATE TABLE table_ctas_pt_old AS SELECT col1, col2, col3, col4 
FROM table_pt_old")
+      checkAnswer(
+        sql("SELECT col1, col2, col3, col4 from table_ctas_pt_old"),
+        Row(2147483648L, "AAA", 3.14, false) :: Row(2147483649L, "BBB", 3.142, 
true) :: Nil)
+    }
+  }
+
+  test("SPARK-26630: table with new input format and without partitioned will 
use NewHadoopRDD") {
+    withTable("table_new", "table_ctas_new") {
+      sql(
+        """
+          |CREATE TABLE table_new (col1 LONG, col2 STRING, col3 DOUBLE, col4 
BOOLEAN)
+          |STORED AS
+          |INPUTFORMAT 'org.apache.hadoop.mapreduce.lib.input.TextInputFormat'
+          |OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
+        """.stripMargin)
+      sql(
+        """
+          |INSERT INTO table_new
+          |VALUES (2147483648, 'AAA', 3.14, false), (2147483649, 'BBB', 3.142, 
true)
+        """.stripMargin)
+      checkAnswer(
+        sql("SELECT col1, col2, col3, col4 FROM table_new"),
+        Row(2147483648L, "AAA", 3.14, false) :: Row(2147483649L, "BBB", 3.142, 
true) :: Nil)
+
+      sql("CREATE TABLE table_ctas_new AS SELECT col1, col2, col3, col4 FROM 
table_new")
+      checkAnswer(
+        sql("SELECT col1, col2, col3, col4 from table_ctas_new"),
+        Row(2147483648L, "AAA", 3.14, false) :: Row(2147483649L, "BBB", 3.142, 
true) :: Nil)
+    }
+  }
+
+  test("SPARK-26630: table with new input format and partitioned will use 
NewHadoopRDD") {
+    withTable("table_pt_new", "table_ctas_pt_new") {
+      sql(
+        """
+          |CREATE TABLE table_pt_new (col1 LONG, col2 STRING, col3 DOUBLE, 
col4 BOOLEAN)
+          |PARTITIONED BY (pt INT)
+          |STORED AS
+          |INPUTFORMAT 'org.apache.hadoop.mapreduce.lib.input.TextInputFormat'
+          |OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
+        """.stripMargin)
+      sql(
+        """
+          |INSERT INTO table_pt_new PARTITION (pt = 1)
+          |VALUES (2147483648, 'AAA', 3.14, false), (2147483649, 'BBB', 3.142, 
true)
+        """.stripMargin)
+      checkAnswer(
+        sql("SELECT col1, col2, col3, col4 FROM table_pt_new WHERE pt = 1"),
+        Row(2147483648L, "AAA", 3.14, false) :: Row(2147483649L, "BBB", 3.142, 
true) :: Nil)
+
+      sql("CREATE TABLE table_ctas_pt_new AS SELECT col1, col2, col3, col4 
FROM table_pt_new")
+      checkAnswer(
+        sql("SELECT col1, col2, col3, col4 from table_ctas_pt_new"),
+        Row(2147483648L, "AAA", 3.14, false) :: Row(2147483649L, "BBB", 3.142, 
true) :: Nil)
+    }
+  }
 }
 
 class HiveDDLSuite


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to