[ 
https://issues.apache.org/jira/browse/HUDI-83?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17060915#comment-17060915
 ] 

Cosmin Iordache edited comment on HUDI-83 at 3/17/20, 1:40 PM:
---------------------------------------------------------------

I'm going to add here some more info regarding backwards compatibility. 
 Continuing investigation : I created a data-set with previous version ( 
spark-2.3.3 - hudi 0.5.0). The columns would be a StructType - TimestampType in 
the dataframe
{code:java}
15:28:22.469 [DataLakeSystem-akka.actor.default-dispatcher-4] INFO 
org.apache.hudi.HoodieSparkSqlWriter$ - Registered avro schema : {
 "type" : "record",
 "name" : "test_record",
 "namespace" : "hoodie.test",
 "fields" : [ 
 ...
{ "name" : "other_date", "type" : [ "long", "null" ] }
,
{ "name" : "timestamp_1", "type" : [ "long", "null" ] }
, 
 ...
 ]
 }
{code}
Above is a timestamp saved as a long ( expected ).
 I then created a new upsert with the same dataframe in (spark 2.4.4 , hudi 
0.5.1) :
{code:java}
 {
 "type" : "record",
 "name" : "test_record",
 "namespace" : "hoodie.test",
 "fields": {
 ...
 "name" : "other_date",
 "type" : [
{ "type" : "long", "logicalType" : "timestamp-micros" }
, "null" ]
 }, {
 "name" : "timestamp_1",
 "type" : [
{ "type" : "long", "logicalType" : "timestamp-micros" }
, "null" ]
 }
 ...
{code}
Ingestion worked. But on reading :
  
{code:java}
scala> q1.select("other_date","timestamp_1").show()
 53277 [Executor task launch worker for task 3] ERROR 
org.apache.spark.executor.Executor - Exception in task 1.0 in stage 2.0 (TID 3)
 org.apache.spark.sql.execution.QueryExecutionException: Parquet column cannot 
be converted in file 
[hdfs://namenode:8020/data/lake/3aa1f8f7-63a4-4aa4-986e-0ed835601999/converted/io_yields_hashkey/-2/8bddbe9e-d0d5-492a-b96d-9b196322a317-0_0-49-79_20200316152824.parquet]
 . Column: [other_date], Expected: timestamp, Found: INT64
 at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:187)
 at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
 at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown
 Source)
 at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
 at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636)
 at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:255)
 at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:836)
 at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:836)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
 at org.apache.spark.scheduler.Task.run(Task.scala:123)
 at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
 Caused by: 
org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
 at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.constructConvertNotSupportedException(VectorizedColumnReader.java:250)
 at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readLongBatch(VectorizedColumnReader.java:440)
 at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:208)
 at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:261)
 at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:159)
 at 
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
 at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
 at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
 ... 22 more
  
{code}
 


was (Author: arw357):
I'm going to add here some more info regarding backwards compatibility. 
 Continuing investigation : I created a data-set with previous version ( 
spark-2.3.3 - hudi 0.5.0)
{code:java}
15:28:22.469 [DataLakeSystem-akka.actor.default-dispatcher-4] INFO 
org.apache.hudi.HoodieSparkSqlWriter$ - Registered avro schema : {
 "type" : "record",
 "name" : "test_record",
 "namespace" : "hoodie.test",
 "fields" : [ 
 ...
{ "name" : "other_date", "type" : [ "long", "null" ] }
,
{ "name" : "timestamp_1", "type" : [ "long", "null" ] }
, 
 ...
 ]
 }
{code}

 Above is a timestamp saved as a long ( expected ).
 I then created a new upsert with the same dataframe in (spark 2.4.4 , hudi 
0.5.1) :
{code:java}
 {
 "type" : "record",
 "name" : "test_record",
 "namespace" : "hoodie.test",
 "fields": {
 ...
 "name" : "other_date",
 "type" : [
{ "type" : "long", "logicalType" : "timestamp-micros" }
, "null" ]
 }, {
 "name" : "timestamp_1",
 "type" : [
{ "type" : "long", "logicalType" : "timestamp-micros" }
, "null" ]
 }
 ...
{code}

 Ingestion worked. But on reading :
  
{code:java}
scala> q1.select("other_date","timestamp_1").show()
 53277 [Executor task launch worker for task 3] ERROR 
org.apache.spark.executor.Executor - Exception in task 1.0 in stage 2.0 (TID 3)
 org.apache.spark.sql.execution.QueryExecutionException: Parquet column cannot 
be converted in file 
[hdfs://namenode:8020/data/lake/3aa1f8f7-63a4-4aa4-986e-0ed835601999/converted/io_yields_hashkey/-2/8bddbe9e-d0d5-492a-b96d-9b196322a317-0_0-49-79_20200316152824.parquet]
 . Column: [other_date], Expected: timestamp, Found: INT64
 at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:187)
 at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
 at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown
 Source)
 at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
 at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636)
 at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:255)
 at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:836)
 at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:836)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
 at org.apache.spark.scheduler.Task.run(Task.scala:123)
 at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
 Caused by: 
org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
 at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.constructConvertNotSupportedException(VectorizedColumnReader.java:250)
 at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readLongBatch(VectorizedColumnReader.java:440)
 at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:208)
 at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:261)
 at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:159)
 at 
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
 at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
 at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
 ... 22 more
  
{code}

  

> Map Timestamp type in spark to corresponding Timestamp type in Hive during 
> Hive sync
> ------------------------------------------------------------------------------------
>
>                 Key: HUDI-83
>                 URL: https://issues.apache.org/jira/browse/HUDI-83
>             Project: Apache Hudi (incubating)
>          Issue Type: Bug
>          Components: Hive Integration, Usability
>            Reporter: Vinoth Chandar
>            Priority: Major
>             Fix For: 0.6.0
>
>
> [https://github.com/apache/incubator-hudi/issues/543] &; related issues 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to