hseagle opened a new issue #2544:
URL: https://github.com/apache/hudi/issues/2544
Failed to read timestamp column after the hive sync is enabled
Here is the testing version list
```ini
hive = 3.1.2
hadoop = 3.2.2
spark = 3.0.1
hudi = 0.7.0
```
Here is the test application code snippet
```scala
import org.apache.spark.sql._
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.functions._
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.spark.sql.functions._
import org.apache.hudi.keygen._
import org.apache.spark.sql.streaming._
case class Person(firstname:String, age:Int, gender:Int)
val personDF = List(Person("tom",45,1),
Person("iris",44,0)).toDF.withColumn("ts",unix_timestamp).withColumn("insert_time",current_timestamp)
//val personDF2 = List(Person("peng",56,1),
Person("iris",51,0),Person("jacky",25,1)).toDF.withColumn("ts",unix_timestamp).withColumn("insert_time",current_timestamp)
//personDF.write.mode(SaveMode.Overwrite).format("hudi").saveAsTable("employee")
val tableName = "employee"
val hudiCommonOptions = Map(
"hoodie.compact.inline" -> "true",
"hoodie.compact.inline.max.delta.commits" ->"5",
"hoodie.base.path" -> s"/tmp/$tableName",
"hoodie.table.name" -> tableName,
"hoodie.datasource.write.table.type"->"MERGE_ON_READ",
"hoodie.datasource.write.operation" -> "upsert",
"hoodie.clean.async" -> "true"
)
val hudiHiveOptions = Map(
DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true",
DataSourceWriteOptions.HIVE_URL_OPT_KEY ->
"jdbc:hive2://localhost:10000",
DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "gender",
DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY -> "true",
"hoodie.datasource.hive_sync.support_timestamp"->"true",
DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> tableName,
DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY ->
classOf[MultiPartKeysValueExtractor].getName
)
val basePath = s"/tmp/$tableName"
personDF.write.format("hudi").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "firstname").
option(PARTITIONPATH_FIELD_OPT_KEY, "gender").
options(hudiCommonOptions).
options(hudiHiveOptions).
mode(SaveMode.Overwrite).
save(basePath)
sql("select * from employee_rt").show(false)
```
The final query got failed and the following is the error message
```log
174262 [Executor task launch worker for task 12017] ERROR
org.apache.spark.executor.Executor - Exception in task 0.0 in stage 31.0 (TID
12017)
java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be
cast to org.apache.hadoop.hive.serde2.io.TimestampWritable
at
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableTimestampObjectInspector.getPrimitiveJavaObject(WritableTimestampObjectInspector.java:39)
at
org.apache.spark.sql.hive.HadoopTableReader$.$anonfun$fillObject$14(TableReader.scala:468)
at
org.apache.spark.sql.hive.HadoopTableReader$.$anonfun$fillObject$14$adapted(TableReader.scala:467)
at
org.apache.spark.sql.hive.HadoopTableReader$.$anonfun$fillObject$18(TableReader.scala:493)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
If read the hudi files directly, the result is correct as expected.
```scala
val employeeDF = spark.read.format("hudi").load("/tmp/employee")
employeeDF.show(false)
```
the result looks like this
```log
+-------------------+--------------------+------------------+----------------------+------------------------------------------------------------------------+---------+---+------+----------+-----------------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name
|firstname|age|gender|ts
|insert_time |
+-------------------+--------------------+------------------+----------------------+------------------------------------------------------------------------+---------+---+------+----------+-----------------------+
|20210206160718 |20210206160718_0_1 |iris |gender=0
|4fd7d48f-7828-4e77-97a5-a5202e32ad08-0_0-21-12008_20210206160718.parquet|iris
|44 |0 |1612598839|2021-02-06 16:07:19.251|
|20210206160718 |20210206160718_1_2 |tom |gender=1
|c0014e5c-66d2-49fa-8af2-9a0b3df9bcf7-0_1-21-12009_20210206160718.parquet|tom
|45 |1 |1612598839|2021-02-06 16:07:19.251|
+-------------------+--------------------+------------------+----------------------+------------------------------------------------------------------------+---------+---+------+----------+-----------------------+
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]