zuyanton opened a new issue #1790:
URL: https://github.com/apache/hudi/issues/1790
It looks like Hudi does not handle DecimalType properly. one of the symptoms
is when we try to use decimal column as partition, Hudi creates folders that
look like this '[0, 0, 0, 0, 0, 0, 0, 0, 27, -63, 109, 103, 78, -56, 0, 0]'
instead of expected '2'. The other symptom is that querying MoR tables fails
when table contain Decimal columns. It looks like failure happens when Hudi
tries to digest decimals coming from log (avro) files. When all data is in the
parquet file, spark sql works just fine.
**To Reproduce**
consider following example:
Step 1 - create table
```
spark.sql("drop table if exists testTable_ro")
spark.sql("drop table if exists testTable_rt")
var df = Seq((1, 2, 3)).toDF("pk", "partition", "sort_key")
df = df.withColumn("decimal_column", df.col("pk").cast("decimal(38,18)"))
df.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save("s3://someBucket/testTable")
```
since its a firs time we wrote to the table, there is no log files as of
yet, Hudi created only one parquet file. Therefore simple select query works as
expected:
```
scala> spark.sql("select * from testTable_rt").show
+-------------------+--------------------+------------------+----------------------+--------------------+---+--------+--------------------+---------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
_hoodie_file_name| pk|sort_key| decimal_column|partition|
+-------------------+--------------------+------------------+----------------------+--------------------+---+--------+--------------------+---------+
| 20200703164509| 20200703164509_0_8| pk:1|
2|cd1293b4-3876-426...| 1| 3|1.000000000000000000| 2|
+-------------------+--------------------+------------------+----------------------+--------------------+---+--------+--------------------+---------+
```
Step 2 - update table with the same datafarame
```
df.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save("s3://someBucket/testTable")
```
since we updating existing record , Hudi this time creates log (avro) file
on top of existing parquet file. Running the same query as in step one will
result in exception
```
scala> spark.sql("select * from testTable_rt").show
java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be
cast to org.apache.hadoop.hive.serde2.io.HiveDecimalWritable
at
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableHiveDecimalObjectInspector.getPrimitiveWritableObject(WritableHiveDecimalObjectInspector.java:41)
at
org.apache.spark.sql.hive.HiveShim$.toCatalystDecimal(HiveShim.scala:107)
at
org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$11.apply(TableReader.scala:414)
at
org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$11.apply(TableReader.scala:413)
at
org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:442)
at
org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:433)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:291)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:283)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
running query that does not pull Decimal column works just fine:
```
scala> spark.sql("select pk from testTable_rt").show
+---+
| pk|
+---+
| 1|
+---+
```
running any query on this table via hive, produces different exception:
```
hive> select * from testTable_rt;
TaskAttempt 3 failed, info=[Error: Error while running task ( failure ) :
attempt_1593458427178_1028_1_00_000000_3:java.lang.RuntimeException:
java.lang.RuntimeException: java.io.IOException:
org.apache.hudi.exception.HoodieIOException: IOException when reading log file
at
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:211)
at
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:168)
at
org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374)
at
org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:73)
at
org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:61)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
at
org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:61)
at
org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:37)
at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: java.io.IOException:
org.apache.hudi.exception.HoodieIOException: IOException when reading log file
at
org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat$TezGroupedSplitsRecordReader.initNextRecordReader(TezGroupedSplitsInputFormat.java:206)
at
org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat$TezGroupedSplitsRecordReader.<init>(TezGroupedSplitsInputFormat.java:145)
at
org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat.getRecordReader(TezGroupedSplitsInputFormat.java:111)
at
org.apache.tez.mapreduce.lib.MRReaderMapred.setupOldRecordReader(MRReaderMapred.java:157)
at
org.apache.tez.mapreduce.lib.MRReaderMapred.setSplit(MRReaderMapred.java:83)
at
org.apache.tez.mapreduce.input.MRInput.initFromEventInternal(MRInput.java:703)
at
org.apache.tez.mapreduce.input.MRInput.initFromEvent(MRInput.java:662)
at
org.apache.tez.mapreduce.input.MRInputLegacy.checkAndAwaitRecordReaderInitialization(MRInputLegacy.java:150)
at
org.apache.tez.mapreduce.input.MRInputLegacy.init(MRInputLegacy.java:114)
at
org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.getMRInput(MapRecordProcessor.java:525)
at
org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.init(MapRecordProcessor.java:171)
at
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:184)
... 14 more
Caused by: java.io.IOException: org.apache.hudi.exception.HoodieIOException:
IOException when reading log file
at
org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain.handleRecordReaderCreationException(HiveIOExceptionHandlerChain.java:97)
at
org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil.handleRecordReaderCreationException(HiveIOExceptionHandlerUtil.java:57)
at
org.apache.hadoop.hive.ql.io.HiveInputFormat.getRecordReader(HiveInputFormat.java:379)
at
org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat$TezGroupedSplitsRecordReader.initNextRecordReader(TezGroupedSplitsInputFormat.java:203)
... 25 more
Caused by: org.apache.hudi.exception.HoodieIOException: IOException when
reading log file
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:242)
at
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:79)
at
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.getMergedLogRecordScanner(RealtimeCompactedRecordReader.java:67)
at
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.<init>(RealtimeCompactedRecordReader.java:50)
at
org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.constructRecordReader(HoodieRealtimeRecordReader.java:67)
at
org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.<init>(HoodieRealtimeRecordReader.java:45)
at
org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat.getRecordReader(HoodieParquetRealtimeInputFormat.java:222)
at
org.apache.hadoop.hive.ql.io.HiveInputFormat.getRecordReader(HiveInputFormat.java:376)
... 26 more
```
Querying read optimized table (testTable_ro) works as expected all the time.
Things get even messier when we try to use decimal column as primary key.
full code snippet:
```
import org.apache.spark.sql.functions._
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.hadoop.hive.conf.HiveConf
val hiveConf = new HiveConf()
val hiveMetastoreURI =
hiveConf.get("hive.metastore.uris").replaceAll("thrift://", "")
val hiveServer2URI = hiveMetastoreURI.substring(0,
hiveMetastoreURI.lastIndexOf(":"))
var hudiOptions = Map[String,String](
HoodieWriteConfig.TABLE_NAME → "testTable",
"hoodie.consistency.check.enabled"->"true",
"hoodie.compact.inline.max.delta.commits"->"100",
"hoodie.compact.inline"->"true",
DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> "MERGE_ON_READ",
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "pk",
DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY ->
classOf[ComplexKeyGenerator].getName,
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY ->"partition",
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "sort_key",
DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY → "true",
DataSourceWriteOptions.HIVE_TABLE_OPT_KEY → "testTable",
DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY → "partition",
DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY →
classOf[MultiPartKeysValueExtractor].getName,
DataSourceWriteOptions.HIVE_URL_OPT_KEY
->s"jdbc:hive2://$hiveServer2URI:10000"
)
spark.sql("drop table if exists testTable_ro")
spark.sql("drop table if exists testTable_rt")
var df = Seq((1, 2, 3)).toDF("pk", "partition", "sort_key")
df = df.withColumn("decimal_column", df.col("pk").cast("decimal(38,18)"))
df.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save("s3://someBucket/testTable")
spark.sql("select * from testTable_rt").show
df.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save("s3://someBucket/testTable")
spark.sql("select * from testTable_rt").show
```
**Environment Description**
* Hudi version :0.5.3
* Spark version :2.4.4
* Hive version :2.3.6
* Hadoop version :2.8.5
* Storage (HDFS/S3/GCS..) :S3
* Running on Docker? (yes/no) :no
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]