Wenning Ding created HUDI-934:
---------------------------------

             Summary: Hive query does not work with realtime table which 
contain decimal type
                 Key: HUDI-934
                 URL: https://issues.apache.org/jira/browse/HUDI-934
             Project: Apache Hudi
          Issue Type: Bug
            Reporter: Wenning Ding


h3. Issue

After updating a MOR table with decimal type, Hive query would fail because of 
a type cast exception.

The bug looks like is because the decimal type is not correctly handled in 
here: 
https://github.com/apache/hudi/blob/master/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java#L303
h3. Reproduction steps

Create a MOR table with decimal type
{code:java}
import org.apache.spark.sql.types._
import spark.implicits._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.types.{StructType, StructField, StringType, 
IntegerType}
import org.apache.spark.sql.Row 
import org.apache.spark.sql.functions
import java.util.Date
import org.apache.spark.sql.DataFrame

var df = Seq(
  (100, "event_name_16", "2015-01-01T13:51:39.340396Z", 1.32, "type1"),
  (101, "event_name_546", "2015-01-01T12:14:58.597216Z", 2.57, "type2"),
  (104, "event_name_123", "2015-01-01T12:15:00.512679Z", 3.45, "type1"),
  (105, "event_name_678", "2015-01-01T13:51:42.248818Z", 6.78, "type2")
  ).toDF("event_id", "event_name", "event_ts", "event_value", "event_type")
  
df = df.withColumn("event_value", df.col("event_value").cast(DecimalType(4,2)))

val tableType = HoodieTableType.MERGE_ON_READ.name
val tableName = "test8"
val tablePath = "s3://xxx/hudi/tables/" + tableName + "/"

df.write.format("org.apache.hudi")
   .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType)
   .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
   .option(HoodieWriteConfig.TABLE_NAME, tableName)
   .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
   .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
   .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type")
   .option("hoodie.compact.inline", "false")
   .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
   .option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true")
   .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
   .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "default")
   .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
   .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, 
"org.apache.hudi.hive.MultiPartKeysValueExtractor")
   .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, 
"org.apache.hudi.keygen.ComplexKeyGenerator")
   .mode(SaveMode.Overwrite)
   .save(tablePath)
{code}
Update this table w/o inline compaction:
{code:java}
var update_df = Seq(
  (100, "event_name_16", "2015-01-01T13:51:39.340396Z", 9.00, "type1"),
  (101, "event_name_546", "2015-01-01T12:14:58.597216Z", 2.57, "type2"),
  (104, "event_name_123", "2015-01-01T12:15:00.512679Z", 8.00, "type1"),
  (105, "event_name_678", "2015-01-01T13:51:42.248818Z", 6.78, "type2")
  ).toDF("event_id", "event_name", "event_ts", "event_value", "event_type")
  
update_df = update_df.withColumn("event_value", 
update_df.col("event_value").cast(DecimalType(4,2)))

update_df.write.format("org.apache.hudi")
             .option(HoodieWriteConfig.TABLE_NAME, tableName)
             .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
             .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, tableType)
             .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
             .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, 
"event_ts")
             .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, 
"event_type")
             .option("hoodie.compact.inline", "false")
             .option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, 
"true")
             .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
             .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
             .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "default")
             .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, 
"event_type")
             
.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, 
"org.apache.hudi.hive.MultiPartKeysValueExtractor")
             .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, 
"org.apache.hudi.keygen.ComplexKeyGenerator")
             .mode(SaveMode.Append)
             .save(tablePath)
{code}
Query _rt table with hive
{code:java}
hive>  select * from test8_rt;
{code}
Get this error
{code:java}
Failed with exception 
java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: 
java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be cast 
to org.apache.hadoop.hive.serde2.io.HiveDecimalWritabl
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to