kepplertreet opened a new issue, #7628:
URL: https://github.com/apache/hudi/issues/7628
**Describe the problem you faced**
Hello guys,
We are trying to create a Hudi table on Amazon S3 with the following
configs: Hudi version: 0.12.1-SNAPSHOT (pre-installed on EMR 6.9.0)
Table type: MOR We are trying to enable column stats in the metadata table.
For this, we enabled 2 configs: hoodie.metadata.index.column.stats.enable = true
hoodie.bloom.index.use.metadata = true (We did not set
hoodie.metadata.index.column.stats.column.list, as we wanted column stats to be
built for every column as stated in the docs) We completed the bulk insert
successfully (with the above configs enabled), and saw that the column stats
partition had been created in the metadata table, as expected. However, when we
tried to run a Spark Structured Streaming application to upsert to this table,
it fails immediately with the following error:
*java.lang.ClassCastException: java.lang.Integer cannot be cast to
java.lang.String*
**Hudi Configurations**
``````"hoodie.table.version": "5",
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.datasource.write.precombine.field": "_commit_time_ms",
"hoodie.datasource.write.commitmeta.key.prefix": "_",
"hoodie.datasource.write.insert.drop.duplicates": "true",
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.use_jdbc": "true",
"hoodie.datasource.hive_sync.auto_create_database": "true",
"hoodie.datasource.hive_sync.support_timestamp": "false",
"hoodie.datasource.hive_sync.skip_ro_suffix": "true",
"hoodie.parquet.compression.codec": "snappy",
"hoodie.metrics.on": "true",
"hoodie.metrics.reporter.type": "PROMETHEUS_PUSHGATEWAY",
"hoodie.metrics.pushgateway.host": <host_ip>,
"hoodie.metrics.pushgateway.port": <port_number>,
"hoodie.metrics.pushgateway.random.job.name.suffix": "false",
"hoodie.metrics.pushgateway.report.period.seconds": "30",
"hoodie.metadata.enable": "true",
"hoodie.metadata.metrics.enable": "true",
"hoodie.metadata.clean.async": "true",
"hoodie.metadata.index.column.stats.enable": "true",
"hoodie.metadata.index.bloom.filter.enable": "true",
"hoodie.metadata.index.async": "true",
"hoodie.write.concurrency.mode": "OPTIMISTIC_CONCURRENCY_CONTROL",
"hoodie.write.lock.provider":
"org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider",
"hoodie.datasource.compaction.async.enable": "true",
"hoodie.compact.schedule.inline": "false",
"hoodie.compact.inline.trigger.strategy": "NUM_COMMITS",
"hoodie.compact.inline.max.delta.commits": 4,
"hoodie.index.type": "BLOOM",
"hoodie.cleaner.policy.failed.writes": "LAZY",
"hoodie.clean.automatic": "true",
"hoodie.clean.async": "true",
"hoodie.cleaner.commits.retained": 4,
"hoodie.write.lock.client.num_retries": 10,
"hoodie.write.lock.wait_time_ms_between_retry": 1000,
"hoodie.write.lock.num_retries": 15,
"hoodie.write.lock.wait_time_ms": 60000,
"hoodie.bloom.index.use.metadata": "true",
"hoodie.archive.async": "true",
"hoodie.parquet.max.file.size": "268435456",
"hoodie.parquet.small.file.limit": "134217728"
``````
**To Reproduce**
Steps to reproduce the behavior:
1. Carried out bulk insert for the table with the following metadata configs
enabled
`"hoodie.metadata.index.column.stats.enable": "true",
"hoodie.metadata.index.bloom.filter.enable": "true",
"hoodie.metadata.index.async": "true"`
2. Started a spark streaming application to carry out upsert operations on
the table with above mentioned Hudi configurations.
**Expected behavior**
The behaviour that we expected was for Hudi to write data to the given table
using the column stats and metadata bloom indices and carry out the required
upserts accordingly but the structured streaming application fails to write any
data and does not even terminate itself but keeps on failing while trying to
explicilty convert the required record key column to String data type.
**Environment Description**
* Hudi version : Hudi 0.12.1-amzn-0-SNAPSHOT (EMR 6.9.0)
* Spark version : Spark 3.3.0
* Hive version : Hive 3.1.3
* Hadoop version : 3.3.3
* Storage (HDFS/S3/GCS..) : AWS S3
* Running on Docker? (yes/no) : No
**Additional context**
We looked at the relevant class that raised this error, and it has a curious
comment:https://github.com/apache/hudi/blob/a5978cd2308f0f2e501e12040f1fafae8afb86e9/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java#L230GitHubhudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java//
NOTE: Here we assume that the type of the primary key field is string.
We also notice a explicit String conversion happening in the consecutive
lines 231 and 232
`(String) unwrapStatisticValueWrapper(entry.getValue().getMinValue())`
` (String) unwrapStatisticValueWrapper(entry.getValue().getMaxValue())`
In our case the primary key field is an Integer and not a String.
**Stacktrace**
```java.lang.ClassCastException: java.lang.Integer cannot be cast to
java.lang.String
at
org.apache.hudi.index.bloom.HoodieBloomIndex.lambda$loadColumnRangesFromMetaIndex$cc8e7ca2$1(HoodieBloomIndex.java:231)
at
org.apache.hudi.client.common.HoodieSparkEngineContext.lambda$flatMap$7d470b86$1(HoodieSparkEngineContext.java:137)
at
org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
at scala.collection.AbstractIterator.to(Iterator.scala:1431)
at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1021)
at
org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2269)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:138)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750) ```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]