chandu-1101 commented on issue #9141:
URL: https://github.com/apache/hudi/issues/9141#issuecomment-1632104122

   I think i am able to narrow down the issue to an extent. 
   
   1. I tried adding column by column, one by one to the select query and re 
ran the insert. At one column things broke. 
   2. Then i adjusted the `limit X` in the sql query (code below) ; and after 
several iterations i find when `limit 472` things break again! So something 
weird is happening with the data once the `471st` row is crossed. What is it? I 
see the Json schema of 471st, 472nd rows similar 
(https://www.diffchecker.com/QRbXbzd5/) 
   
   What is strange to me is:: 
   1. As of now i am using spark-SQL (no hudi) to merge the snapshot (parquet 
file) with the cdc (json files) and since 1.5y this is going good --meaning 
existing schema is still intact. 
   2. Then, why does it fail with Hudi? with an error?
   3. One more:  if i change the code to add bulk insert, then this insert 
works (where is the code? previous posts, pl check). But the subsequent merge 
fails --why? i don't know!
   
   
   - I am wondering how the companies are using Hudi 
   - or if CDC merging (meaning CDC dataframe merging with snapshot dataframe 
in spark) is the wrong usecase to hudi? 
   - or should the data follow some pattern internally? for things to work? --I 
have ~5000 db collections to which merge has to be run. I am thinking what can 
be done if i encounter the exceptions i get, in production?
   
   
   spark-shell command
   ```
   spark-shell --driver-memory 1g --executor-memory 4g --executor-cores 1 
--driver-cores 1 --conf spark.dynamicAllocation.maxExecutors=2 --conf 
"spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf 
"spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" 
--conf 
"spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog"
 --conf "spark.sql.caseSensitive=true"  --conf 
"spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" 
--conf 
"spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog"
 --conf spark.sql.legacy.parquet.int96RebaseModeInRead=CORRECTED --conf 
spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED --conf 
spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED --conf 
spark.sql.legacy.parquet.datetimeRebaseModeInWrite=CORRECTED --name ravic  
--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.1    --jars 
/home/hadoop/jars2/spark-1.0-SNAPSHOT.jar,/home/hadoop/hudi
 
/hudi-release-0.12.3/packaging/hudi-spark-bundle/target/hudi-spark3.3-bundle_2.12-0.12.3.jar
   
   ```
   
   
   Code
   ```
       import org.apache.commons.lang3.ClassUtils.getCanonicalName
       import org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider
       import org.apache.hudi.{DataSourceWriteOptions, QuickstartUtils}
       import org.apache.hudi.common.model.{HoodieAvroPayload, 
HoodieFileFormat, WriteOperationType}
       import org.apache.hudi.common.table.HoodieTableConfig
       import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig}
       import org.apache.hudi.keygen.constant.KeyGeneratorOptions
   
       import java.util
       import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
       import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, 
SimpleKeyGenerator}
       import org.apache.spark.sql.SaveMode
       import org.apache.spark.sql.functions.{col, hash, lit}
       import org.apache.hudi.QuickstartUtils._
   
       val sess = spark;
       val snapshotDf = 
sess.read.parquet("s3://p-crm-messaging-v2/snapshots-test/ge11-drop/")
       snapshotDf.cache()
       snapshotDf.registerTempTable("snapshot")
   
       // 472 is the culprit
       val snapshotDf2 = sess.sql("select * from snapshot order by _id.oid 
limit 472 ")   
       snapshotDf2.registerTempTable("snapshot2") 
       val snapshotDf3 = sess.sql("select _id, cdc_pk, addressLogs  from 
snapshot2 ")
       snapshotDf3.write.format("hudi")
         .options(getQuickstartWriteConfigs)
         .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "cdc_pk")
         .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_id.oid")
         .option(HoodieWriteConfig.TABLE_NAME, "GE11")
         .mode(SaveMode.Overwrite)
         .save("s3://p-crm-messaging-v2/snapshots-hudi/ge11-drop/snapshot");
   ```
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to