[
https://issues.apache.org/jira/browse/HUDI-713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Wenning Ding updated HUDI-713:
------------------------------
Description:
Similar to [https://issues.apache.org/jira/browse/HUDI-530]. With migration of
Hudi to spark 2.4.4 and using Spark's native spark-avro module, this issue now
exists in Hudi master.
Reproduce steps:
Run following script
{code:java}
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import spark.implicits._
val sample = """
[{
"partition": 0,
"offset": 5,
"timestamp": "1581508884",
"value": {
"prop1": "val1",
"prop2": [{"withinProp1": "val2", "withinProp2": 1}]
}
}, {
"partition": 1,
"offset": 10,
"timestamp": "1581108884",
"value": {
"prop1": "val4",
"prop2": [{"withinProp1": "val5", "withinProp2": 2}]
}
}]
"""
val df = spark.read.option("dropFieldIfAllNull", "true").json(Seq(sample).toDS)
val dfcol1 = df.withColumn("op_ts", from_unixtime(col("timestamp")))
val dfcol2 = dfcol1.withColumn("year_partition",
year(col("op_ts"))).withColumn("id", concat($"partition", lit("-"), $"offset"))
val dfcol3 = dfcol2.drop("timestamp")
val hudiOptions: Map[String, String] =
Map[String, String](
DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY -> "test",
DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY ->
DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL,
DataSourceWriteOptions.OPERATION_OPT_KEY ->
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "op_ts",
DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true",
DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY ->
classOf[MultiPartKeysValueExtractor].getName,
"hoodie.parquet.max.file.size" -> String.valueOf(1024 * 1024 *
1024),
"hoodie.parquet.compression.ratio" -> String.valueOf(0.5),
"hoodie.insert.shuffle.parallelism" -> String.valueOf(2)
)
dfcol3.write.format("org.apache.hudi")
.options(hudiOptions)
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY,
"year_partition")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY,
"year_partition")
.option(HoodieWriteConfig.TABLE_NAME, "AWS_TEST")
.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, "AWS_TEST")
.mode(SaveMode.Append).save("s3://xxx/AWS_TEST/")
{code}
Will throw not in union exception:
{code:java}
Caused by: org.apache.avro.UnresolvedUnionException: Not in union
[{"type":"record","name":"prop2","namespace":"hoodie.AWS_TEST.AWS_TEST_record.value","fields":[{"name":"withinProp1","type":["string","null"]},{"name":"withinProp2","type":["long","null"]}]},"null"]:
{"withinProp1": "val2", "withinProp2": 1}
{code}
was:
Similar to [https://issues.apache.org/jira/browse/HUDI-530]. With migration of
Hudi to spark 2.4.4 and using Spark's native spark-avro module, this issue now
exists in Hudi master.
Reproduce steps:
Run following script
{code:java}
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import spark.implicits._val sample = """
[{
"partition": 0,
"offset": 5,
"timestamp": "1581508884",
"value": {
"prop1": "val1",
"prop2": [{"withinProp1": "val2", "withinProp2": 1}]
}
}, {
"partition": 1,
"offset": 10,
"timestamp": "1581108884",
"value": {
"prop1": "val4",
"prop2": [{"withinProp1": "val5", "withinProp2": 2}]
}
}]
"""val df = spark.read.option("dropFieldIfAllNull",
"true").json(Seq(sample).toDS)
val dfcol1 = df.withColumn("op_ts", from_unixtime(col("timestamp")))
val dfcol2 = dfcol1.withColumn("year_partition",
year(col("op_ts"))).withColumn("id", concat($"partition", lit("-"), $"offset"))
val dfcol3 = dfcol2.drop("timestamp")val hudiOptions: Map[String, String] =
Map[String, String](
DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY -> "test",
DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY ->
DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL,
DataSourceWriteOptions.OPERATION_OPT_KEY ->
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "op_ts",
DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true",
DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY ->
classOf[MultiPartKeysValueExtractor].getName,
"hoodie.parquet.max.file.size" -> String.valueOf(1024 * 1024 *
1024),
"hoodie.parquet.compression.ratio" -> String.valueOf(0.5),
"hoodie.insert.shuffle.parallelism" -> String.valueOf(2)
)dfcol3.write.format("org.apache.hudi")
.options(hudiOptions)
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY,
"year_partition")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY,
"year_partition")
.option(HoodieWriteConfig.TABLE_NAME, "AWS_TEST")
.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, "AWS_TEST")
.mode(SaveMode.Append).save("s3://xxx/AWS_TEST/")
{code}
Will throw not in union exception:
{code:java}
Caused by: org.apache.avro.UnresolvedUnionException: Not in union
[{"type":"record","name":"prop2","namespace":"hoodie.AWS_TEST.AWS_TEST_record.value","fields":[{"name":"withinProp1","type":["string","null"]},{"name":"withinProp2","type":["long","null"]}]},"null"]:
{"withinProp1": "val2", "withinProp2": 1}
{code}
> Datasource Writer throws error on resolving array of struct fields
> ------------------------------------------------------------------
>
> Key: HUDI-713
> URL: https://issues.apache.org/jira/browse/HUDI-713
> Project: Apache Hudi (incubating)
> Issue Type: Bug
> Reporter: Wenning Ding
> Priority: Major
>
> Similar to [https://issues.apache.org/jira/browse/HUDI-530]. With migration
> of Hudi to spark 2.4.4 and using Spark's native spark-avro module, this issue
> now exists in Hudi master.
> Reproduce steps:
> Run following script
> {code:java}
> import org.apache.hudi.DataSourceWriteOptions
> import org.apache.hudi.config.HoodieWriteConfig
> import org.apache.hudi.hive.MultiPartKeysValueExtractor
> import org.apache.spark.sql.SaveMode
> import org.apache.spark.sql._
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.types._
> import spark.implicits._
> val sample = """
> [{
> "partition": 0,
> "offset": 5,
> "timestamp": "1581508884",
> "value": {
> "prop1": "val1",
> "prop2": [{"withinProp1": "val2", "withinProp2": 1}]
> }
> }, {
> "partition": 1,
> "offset": 10,
> "timestamp": "1581108884",
> "value": {
> "prop1": "val4",
> "prop2": [{"withinProp1": "val5", "withinProp2": 2}]
> }
> }]
> """
> val df = spark.read.option("dropFieldIfAllNull",
> "true").json(Seq(sample).toDS)
> val dfcol1 = df.withColumn("op_ts", from_unixtime(col("timestamp")))
> val dfcol2 = dfcol1.withColumn("year_partition",
> year(col("op_ts"))).withColumn("id", concat($"partition", lit("-"),
> $"offset"))
> val dfcol3 = dfcol2.drop("timestamp")
> val hudiOptions: Map[String, String] =
> Map[String, String](
> DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY -> "test",
> DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY ->
> DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL,
> DataSourceWriteOptions.OPERATION_OPT_KEY ->
> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
> DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "op_ts",
> DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true",
> DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY ->
> classOf[MultiPartKeysValueExtractor].getName,
> "hoodie.parquet.max.file.size" -> String.valueOf(1024 * 1024 *
> 1024),
> "hoodie.parquet.compression.ratio" -> String.valueOf(0.5),
> "hoodie.insert.shuffle.parallelism" -> String.valueOf(2)
> )
> dfcol3.write.format("org.apache.hudi")
> .options(hudiOptions)
> .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
> .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY,
> "year_partition")
> .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY,
> "year_partition")
> .option(HoodieWriteConfig.TABLE_NAME, "AWS_TEST")
> .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, "AWS_TEST")
> .mode(SaveMode.Append).save("s3://xxx/AWS_TEST/")
> {code}
> Will throw not in union exception:
> {code:java}
> Caused by: org.apache.avro.UnresolvedUnionException: Not in union
> [{"type":"record","name":"prop2","namespace":"hoodie.AWS_TEST.AWS_TEST_record.value","fields":[{"name":"withinProp1","type":["string","null"]},{"name":"withinProp2","type":["long","null"]}]},"null"]:
> {"withinProp1": "val2", "withinProp2": 1}
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)