zuyanton opened a new issue #1790:
URL: https://github.com/apache/hudi/issues/1790


   It looks like Hudi does not handle DecimalType properly. one of the symptoms 
is when we try to use decimal column as partition, Hudi creates folders that 
look like this '[0, 0, 0, 0, 0, 0, 0, 0, 27, -63, 109, 103, 78, -56, 0, 0]' 
instead of expected '2'. The other symptom is that querying MoR tables fails 
when table contain Decimal columns. It looks like failure happens when Hudi 
tries to digest decimals coming from log (avro) files. When all data is in the 
parquet file, spark sql works just fine.  
   
   **To Reproduce**
   consider following example:
   Step 1 - create table
   ```
       spark.sql("drop table if exists testTable_ro")
       spark.sql("drop table if exists testTable_rt")
       var df = Seq((1, 2, 3)).toDF("pk", "partition", "sort_key")
       df = df.withColumn("decimal_column", df.col("pk").cast("decimal(38,18)"))
       
df.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save("s3://someBucket/testTable")
   ```
   since its a firs time we wrote to the table, there is no log files as of 
yet, Hudi created only one parquet file. Therefore simple select query works as 
expected:  
   ```
   scala> spark.sql("select * from testTable_rt").show
   
       
+-------------------+--------------------+------------------+----------------------+--------------------+---+--------+--------------------+---------+
       
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
   _hoodie_file_name| pk|sort_key|      decimal_column|partition|
       
+-------------------+--------------------+------------------+----------------------+--------------------+---+--------+--------------------+---------+
       |     20200703164509|  20200703164509_0_8|              pk:1|            
         2|cd1293b4-3876-426...|  1|       3|1.000000000000000000|        2|
       
+-------------------+--------------------+------------------+----------------------+--------------------+---+--------+--------------------+---------+
   
   ```
   Step 2 - update table with the same datafarame  
   ```
   
df.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save("s3://someBucket/testTable")
   ```  
   since we updating existing record , Hudi this time creates log (avro) file 
on top of existing parquet file. Running the same query as in step one will 
result in exception   
   ```
   scala> spark.sql("select * from testTable_rt").show
   
   java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be 
cast to org.apache.hadoop.hive.serde2.io.HiveDecimalWritable
        at 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableHiveDecimalObjectInspector.getPrimitiveWritableObject(WritableHiveDecimalObjectInspector.java:41)
        at 
org.apache.spark.sql.hive.HiveShim$.toCatalystDecimal(HiveShim.scala:107)
        at 
org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$11.apply(TableReader.scala:414)
        at 
org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$11.apply(TableReader.scala:413)
        at 
org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:442)
        at 
org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:433)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:291)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:283)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   ```   
   running query that does not pull Decimal column works just fine:
   ```
      scala> spark.sql("select pk from testTable_rt").show
       +---+
       | pk|
       +---+
       |  1|
       +---+
   ```  
   running any query on this table via hive, produces different exception:
   ```
   hive> select * from testTable_rt;
   
    TaskAttempt 3 failed, info=[Error: Error while running task ( failure ) : 
attempt_1593458427178_1028_1_00_000000_3:java.lang.RuntimeException: 
java.lang.RuntimeException: java.io.IOException: 
org.apache.hudi.exception.HoodieIOException: IOException when reading log file 
        at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:211)
        at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:168)
        at 
org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:73)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:61)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:61)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:37)
        at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: java.lang.RuntimeException: java.io.IOException: 
org.apache.hudi.exception.HoodieIOException: IOException when reading log file 
        at 
org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat$TezGroupedSplitsRecordReader.initNextRecordReader(TezGroupedSplitsInputFormat.java:206)
        at 
org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat$TezGroupedSplitsRecordReader.<init>(TezGroupedSplitsInputFormat.java:145)
        at 
org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat.getRecordReader(TezGroupedSplitsInputFormat.java:111)
        at 
org.apache.tez.mapreduce.lib.MRReaderMapred.setupOldRecordReader(MRReaderMapred.java:157)
        at 
org.apache.tez.mapreduce.lib.MRReaderMapred.setSplit(MRReaderMapred.java:83)
        at 
org.apache.tez.mapreduce.input.MRInput.initFromEventInternal(MRInput.java:703)
        at 
org.apache.tez.mapreduce.input.MRInput.initFromEvent(MRInput.java:662)
        at 
org.apache.tez.mapreduce.input.MRInputLegacy.checkAndAwaitRecordReaderInitialization(MRInputLegacy.java:150)
        at 
org.apache.tez.mapreduce.input.MRInputLegacy.init(MRInputLegacy.java:114)
        at 
org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.getMRInput(MapRecordProcessor.java:525)
        at 
org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.init(MapRecordProcessor.java:171)
        at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:184)
        ... 14 more
   Caused by: java.io.IOException: org.apache.hudi.exception.HoodieIOException: 
IOException when reading log file 
        at 
org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain.handleRecordReaderCreationException(HiveIOExceptionHandlerChain.java:97)
        at 
org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil.handleRecordReaderCreationException(HiveIOExceptionHandlerUtil.java:57)
        at 
org.apache.hadoop.hive.ql.io.HiveInputFormat.getRecordReader(HiveInputFormat.java:379)
        at 
org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat$TezGroupedSplitsRecordReader.initNextRecordReader(TezGroupedSplitsInputFormat.java:203)
        ... 25 more
   Caused by: org.apache.hudi.exception.HoodieIOException: IOException when 
reading log file 
        at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:242)
        at 
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:79)
        at 
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.getMergedLogRecordScanner(RealtimeCompactedRecordReader.java:67)
        at 
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.<init>(RealtimeCompactedRecordReader.java:50)
        at 
org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.constructRecordReader(HoodieRealtimeRecordReader.java:67)
        at 
org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.<init>(HoodieRealtimeRecordReader.java:45)
        at 
org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat.getRecordReader(HoodieParquetRealtimeInputFormat.java:222)
        at 
org.apache.hadoop.hive.ql.io.HiveInputFormat.getRecordReader(HiveInputFormat.java:376)
        ... 26 more
   
   ```
   Querying read optimized table (testTable_ro) works as expected all the time.
   Things get even messier when we try to use decimal column as primary key.   
   
   full code snippet:
   ```
       import org.apache.spark.sql.functions._
       import org.apache.hudi.hive.MultiPartKeysValueExtractor
       import org.apache.hudi.QuickstartUtils._
       import scala.collection.JavaConversions._
       import org.apache.spark.sql.SaveMode
       import org.apache.hudi.DataSourceReadOptions._
       import org.apache.hudi.DataSourceWriteOptions._
       import org.apache.hudi.DataSourceWriteOptions
       import org.apache.hudi.config.HoodieWriteConfig._
       import org.apache.hudi.config.HoodieWriteConfig
       import org.apache.hudi.keygen.ComplexKeyGenerator
       import org.apache.hadoop.hive.conf.HiveConf
       val hiveConf = new HiveConf()
       val hiveMetastoreURI = 
hiveConf.get("hive.metastore.uris").replaceAll("thrift://", "")
       val hiveServer2URI = hiveMetastoreURI.substring(0, 
hiveMetastoreURI.lastIndexOf(":"))
       var hudiOptions = Map[String,String](
         HoodieWriteConfig.TABLE_NAME → "testTable",
         "hoodie.consistency.check.enabled"->"true",
         "hoodie.compact.inline.max.delta.commits"->"100",
         "hoodie.compact.inline"->"true",
         DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> "MERGE_ON_READ",
         DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "pk",
         DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> 
classOf[ComplexKeyGenerator].getName,
         DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY ->"partition",
         DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "sort_key",
         DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY → "true",
         DataSourceWriteOptions.HIVE_TABLE_OPT_KEY → "testTable",
         DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY → "partition",
         DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY → 
classOf[MultiPartKeysValueExtractor].getName,
         DataSourceWriteOptions.HIVE_URL_OPT_KEY 
->s"jdbc:hive2://$hiveServer2URI:10000"
       )
   
       spark.sql("drop table if exists testTable_ro")
       spark.sql("drop table if exists testTable_rt")
       var df = Seq((1, 2, 3)).toDF("pk", "partition", "sort_key")
       df = df.withColumn("decimal_column", df.col("pk").cast("decimal(38,18)"))
       
df.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save("s3://someBucket/testTable")
       spark.sql("select * from testTable_rt").show
       
       
df.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save("s3://someBucket/testTable")
       spark.sql("select * from testTable_rt").show
   ```
   
   **Environment Description**
   
   * Hudi version :0.5.3
   
   * Spark version :2.4.4
   
   * Hive version :2.3.6
   
   * Hadoop version :2.8.5
   
   * Storage (HDFS/S3/GCS..) :S3
   
   * Running on Docker? (yes/no) :no
   
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to