chandu-1101 commented on issue #9141: URL: https://github.com/apache/hudi/issues/9141#issuecomment-1632104122
I think i am able to narrow down the issue to an extent. 1. I tried adding column by column, one by one to the select query and re ran the insert. At one column things broke. 2. Then i adjusted the `limit X` in the sql query (code below) ; and after several iterations i find when `limit 472` things break again! So something weird is happening with the data once the `471st` row is crossed. What is it? I see the Json schema of 471st, 472nd rows similar (https://www.diffchecker.com/QRbXbzd5/) What is strange to me is:: 1. As of now i am using spark-SQL (no hudi) to merge the snapshot (parquet file) with the cdc (json files) and since 1.5y this is going good --meaning existing schema is still intact. 2. Then, why does it fail with Hudi? with an error? 3. One more: if i change the code to add bulk insert, then this insert works (where is the code? previous posts, pl check). But the subsequent merge fails --why? i don't know! - I am wondering how the companies are using Hudi - or if CDC merging (meaning CDC dataframe merging with snapshot dataframe in spark) is the wrong usecase to hudi? - or should the data follow some pattern internally? for things to work? --I have ~5000 db collections to which merge has to be run. I am thinking what can be done if i encounter the exceptions i get, in production? spark-shell command ``` spark-shell --driver-memory 1g --executor-memory 4g --executor-cores 1 --driver-cores 1 --conf spark.dynamicAllocation.maxExecutors=2 --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" --conf "spark.sql.caseSensitive=true" --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" --conf spark.sql.legacy.parquet.int96RebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInWrite=CORRECTED --name ravic --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.1 --jars /home/hadoop/jars2/spark-1.0-SNAPSHOT.jar,/home/hadoop/hudi /hudi-release-0.12.3/packaging/hudi-spark-bundle/target/hudi-spark3.3-bundle_2.12-0.12.3.jar ``` Code ``` import org.apache.commons.lang3.ClassUtils.getCanonicalName import org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider import org.apache.hudi.{DataSourceWriteOptions, QuickstartUtils} import org.apache.hudi.common.model.{HoodieAvroPayload, HoodieFileFormat, WriteOperationType} import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig} import org.apache.hudi.keygen.constant.KeyGeneratorOptions import java.util import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator} import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions.{col, hash, lit} import org.apache.hudi.QuickstartUtils._ val sess = spark; val snapshotDf = sess.read.parquet("s3://p-crm-messaging-v2/snapshots-test/ge11-drop/") snapshotDf.cache() snapshotDf.registerTempTable("snapshot") // 472 is the culprit val snapshotDf2 = sess.sql("select * from snapshot order by _id.oid limit 472 ") snapshotDf2.registerTempTable("snapshot2") val snapshotDf3 = sess.sql("select _id, cdc_pk, addressLogs from snapshot2 ") snapshotDf3.write.format("hudi") .options(getQuickstartWriteConfigs) .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "cdc_pk") .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_id.oid") .option(HoodieWriteConfig.TABLE_NAME, "GE11") .mode(SaveMode.Overwrite) .save("s3://p-crm-messaging-v2/snapshots-hudi/ge11-drop/snapshot"); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
