hseagle opened a new issue #2544:
URL: https://github.com/apache/hudi/issues/2544


   Failed to read timestamp column after the hive sync is enabled
   
   Here is the testing version list
   
   ```ini
   hive = 3.1.2
   hadoop = 3.2.2
   spark = 3.0.1
   hudi = 0.7.0
   ```
   Here is the test application code snippet
   
   ```scala
   import org.apache.spark.sql._
   import org.apache.hudi.QuickstartUtils._
   import scala.collection.JavaConversions._
   import org.apache.spark.sql.SaveMode._
   import org.apache.hudi.DataSourceReadOptions._
   import org.apache.hudi.DataSourceWriteOptions._
   import org.apache.hudi.config.HoodieWriteConfig._
   import org.apache.spark.sql.functions._
   import org.apache.hudi.QuickstartUtils._
   import scala.collection.JavaConversions._
   import org.apache.spark.sql.SaveMode._
   import org.apache.hudi.DataSourceReadOptions
   import org.apache.hudi.DataSourceWriteOptions
   import org.apache.hudi.config.HoodieWriteConfig
   import org.apache.hudi.hive.MultiPartKeysValueExtractor
   import org.apache.spark.sql.functions._
   import org.apache.hudi.keygen._
   import org.apache.spark.sql.streaming._
   
   case class Person(firstname:String, age:Int, gender:Int)
   val personDF = List(Person("tom",45,1), 
Person("iris",44,0)).toDF.withColumn("ts",unix_timestamp).withColumn("insert_time",current_timestamp)
   //val personDF2 = List(Person("peng",56,1), 
Person("iris",51,0),Person("jacky",25,1)).toDF.withColumn("ts",unix_timestamp).withColumn("insert_time",current_timestamp)
   
   
//personDF.write.mode(SaveMode.Overwrite).format("hudi").saveAsTable("employee")
   
   val tableName = "employee"
   val hudiCommonOptions = Map(
     "hoodie.compact.inline" -> "true",
     "hoodie.compact.inline.max.delta.commits" ->"5",
     "hoodie.base.path" -> s"/tmp/$tableName",
     "hoodie.table.name" -> tableName,
     "hoodie.datasource.write.table.type"->"MERGE_ON_READ",
     "hoodie.datasource.write.operation" -> "upsert",
     "hoodie.clean.async" -> "true"
   )
   
   val hudiHiveOptions = Map(
       DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true",
       DataSourceWriteOptions.HIVE_URL_OPT_KEY -> 
"jdbc:hive2://localhost:10000",
       DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "gender",
       DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY -> "true",
       "hoodie.datasource.hive_sync.support_timestamp"->"true",
       DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> tableName,
       DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> 
classOf[MultiPartKeysValueExtractor].getName
   )
   
   val basePath = s"/tmp/$tableName"
   personDF.write.format("hudi").
     option(PRECOMBINE_FIELD_OPT_KEY, "ts").
     option(RECORDKEY_FIELD_OPT_KEY, "firstname").
     option(PARTITIONPATH_FIELD_OPT_KEY, "gender").
     options(hudiCommonOptions).
     options(hudiHiveOptions).
     mode(SaveMode.Overwrite).
     save(basePath)
   
   sql("select * from employee_rt").show(false)
   ```
   The final query got failed and the following is the error message
   ```log
   174262 [Executor task launch worker for task 12017] ERROR 
org.apache.spark.executor.Executor  - Exception in task 0.0 in stage 31.0 (TID 
12017)
   java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be 
cast to org.apache.hadoop.hive.serde2.io.TimestampWritable
        at 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableTimestampObjectInspector.getPrimitiveJavaObject(WritableTimestampObjectInspector.java:39)
        at 
org.apache.spark.sql.hive.HadoopTableReader$.$anonfun$fillObject$14(TableReader.scala:468)
        at 
org.apache.spark.sql.hive.HadoopTableReader$.$anonfun$fillObject$14$adapted(TableReader.scala:467)
        at 
org.apache.spark.sql.hive.HadoopTableReader$.$anonfun$fillObject$18(TableReader.scala:493)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   ```
   
   If read the hudi files directly, the result is correct as expected.
   
   ```scala
   val employeeDF = spark.read.format("hudi").load("/tmp/employee")
   employeeDF.show(false)
   ```
   the result looks like this
   
   ```log
   
+-------------------+--------------------+------------------+----------------------+------------------------------------------------------------------------+---------+---+------+----------+-----------------------+
   
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name
                                                       |firstname|age|gender|ts 
       |insert_time            |
   
+-------------------+--------------------+------------------+----------------------+------------------------------------------------------------------------+---------+---+------+----------+-----------------------+
   |20210206160718     |20210206160718_0_1  |iris              |gender=0        
      
|4fd7d48f-7828-4e77-97a5-a5202e32ad08-0_0-21-12008_20210206160718.parquet|iris  
   |44 |0     |1612598839|2021-02-06 16:07:19.251|
   |20210206160718     |20210206160718_1_2  |tom               |gender=1        
      
|c0014e5c-66d2-49fa-8af2-9a0b3df9bcf7-0_1-21-12009_20210206160718.parquet|tom   
   |45 |1     |1612598839|2021-02-06 16:07:19.251|
   
+-------------------+--------------------+------------------+----------------------+------------------------------------------------------------------------+---------+---+------+----------+-----------------------+
   ```
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to