kepplertreet opened a new issue, #7628:
URL: https://github.com/apache/hudi/issues/7628

   **Describe the problem you faced**
   Hello guys,
   We are trying to create a Hudi table on Amazon S3 with the following 
configs: Hudi version: 0.12.1-SNAPSHOT (pre-installed on EMR 6.9.0)
   Table type: MOR We are trying to enable column stats in the metadata table. 
For this, we enabled 2 configs: hoodie.metadata.index.column.stats.enable = true
   hoodie.bloom.index.use.metadata = true (We did not set 
hoodie.metadata.index.column.stats.column.list, as we wanted column stats to be 
built for every column as stated in the docs) We completed the bulk insert 
successfully (with the above configs enabled), and saw that the column stats 
partition had been created in the metadata table, as expected. However, when we 
tried to run a Spark Structured Streaming application to upsert to this table, 
it fails immediately with the following error:
   *java.lang.ClassCastException: java.lang.Integer cannot be cast to 
java.lang.String*
   
   **Hudi Configurations**
   ``````"hoodie.table.version": "5",    
     "hoodie.datasource.write.operation": "upsert",    
     "hoodie.datasource.write.hive_style_partitioning": "true",    
     "hoodie.datasource.write.precombine.field": "_commit_time_ms",     
     "hoodie.datasource.write.commitmeta.key.prefix": "_",     
     "hoodie.datasource.write.insert.drop.duplicates": "true",     
     "hoodie.datasource.hive_sync.enable": "true",     
     "hoodie.datasource.hive_sync.use_jdbc": "true",     
     "hoodie.datasource.hive_sync.auto_create_database": "true",    
     "hoodie.datasource.hive_sync.support_timestamp": "false",    
     "hoodie.datasource.hive_sync.skip_ro_suffix": "true",    
     "hoodie.parquet.compression.codec": "snappy",     
     "hoodie.metrics.on": "true",    
     "hoodie.metrics.reporter.type": "PROMETHEUS_PUSHGATEWAY",   
     "hoodie.metrics.pushgateway.host": <host_ip>,    
     "hoodie.metrics.pushgateway.port": <port_number>,    
     "hoodie.metrics.pushgateway.random.job.name.suffix": "false",    
     "hoodie.metrics.pushgateway.report.period.seconds": "30",    
     "hoodie.metadata.enable": "true",    
     "hoodie.metadata.metrics.enable": "true",    
     "hoodie.metadata.clean.async": "true",    
     "hoodie.metadata.index.column.stats.enable": "true",    
     "hoodie.metadata.index.bloom.filter.enable": "true",   
     "hoodie.metadata.index.async": "true",   
     "hoodie.write.concurrency.mode": "OPTIMISTIC_CONCURRENCY_CONTROL",   
     "hoodie.write.lock.provider": 
"org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider",   
     "hoodie.datasource.compaction.async.enable": "true",   
     "hoodie.compact.schedule.inline": "false",    
     "hoodie.compact.inline.trigger.strategy": "NUM_COMMITS",   
     "hoodie.compact.inline.max.delta.commits": 4,   
     "hoodie.index.type": "BLOOM",   
     "hoodie.cleaner.policy.failed.writes": "LAZY",   
     "hoodie.clean.automatic": "true",   
     "hoodie.clean.async": "true",   
     "hoodie.cleaner.commits.retained": 4,   
     "hoodie.write.lock.client.num_retries": 10,   
     "hoodie.write.lock.wait_time_ms_between_retry": 1000,     
     "hoodie.write.lock.num_retries": 15,     
     "hoodie.write.lock.wait_time_ms": 60000,     
     "hoodie.bloom.index.use.metadata": "true",     
     "hoodie.archive.async": "true",   
     "hoodie.parquet.max.file.size": "268435456",   
     "hoodie.parquet.small.file.limit": "134217728"
   ``````
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Carried out bulk insert for the table with the following metadata configs 
enabled 
    `"hoodie.metadata.index.column.stats.enable": "true", 
     "hoodie.metadata.index.bloom.filter.enable": "true",
     "hoodie.metadata.index.async": "true"`
   2.  Started a spark streaming application to carry out upsert operations on 
the table with above mentioned Hudi configurations. 
   
   **Expected behavior**
   The behaviour that we expected was for Hudi to write data to the given table 
using the column stats and metadata bloom indices and carry out the required 
upserts accordingly but the structured streaming application fails to write any 
data and does not even terminate itself but keeps on failing while trying to 
explicilty convert the required record key column to String data type. 
   
   **Environment Description**
   
   * Hudi version : Hudi 0.12.1-amzn-0-SNAPSHOT (EMR 6.9.0)
   * Spark version : Spark 3.3.0
   * Hive version : Hive 3.1.3 
   * Hadoop version : 3.3.3
   * Storage (HDFS/S3/GCS..) : AWS S3 
   * Running on Docker? (yes/no) : No 
   
   **Additional context**
   We looked at the relevant class that raised this error, and it has a curious 
comment:https://github.com/apache/hudi/blob/a5978cd2308f0f2e501e12040f1fafae8afb86e9/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java#L230GitHubhudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java//
 NOTE: Here we assume that the type of the primary key field is string. 
   We also notice a explicit String conversion happening in the consecutive 
lines 231 and 232 
   `(String) unwrapStatisticValueWrapper(entry.getValue().getMinValue())`
   ` (String) unwrapStatisticValueWrapper(entry.getValue().getMaxValue())`
   
   In our case the primary key field is an Integer and not a String. 
   
   **Stacktrace**
   
   ```java.lang.ClassCastException: java.lang.Integer cannot be cast to 
java.lang.String
       at 
org.apache.hudi.index.bloom.HoodieBloomIndex.lambda$loadColumnRangesFromMetaIndex$cc8e7ca2$1(HoodieBloomIndex.java:231)
       at 
org.apache.hudi.client.common.HoodieSparkEngineContext.lambda$flatMap$7d470b86$1(HoodieSparkEngineContext.java:137)
       at 
org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
       at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
       at scala.collection.Iterator.foreach(Iterator.scala:943)
       at scala.collection.Iterator.foreach$(Iterator.scala:943)
       at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
       at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
       at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
       at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
       at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
       at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
       at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
       at scala.collection.AbstractIterator.to(Iterator.scala:1431)
       at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
       at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
       at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
       at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
       at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
       at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
       at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1021)
       at 
org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2269)
       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
       at org.apache.spark.scheduler.Task.run(Task.scala:138)
       at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
       at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
       at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
       at java.lang.Thread.run(Thread.java:750) ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to