ankitchandnani opened a new issue, #8672: URL: https://github.com/apache/hudi/issues/8672
**Describe the problem you faced** Hi Everyone, I am testing out hudi 0.12.2 on EMR Deltastreamer with version emr-6.9.0 to perform the INSERT_OVERWRITE_TABLE operation on a set of parquet files in S3 (Source). I am trying to overwrite the entire table in S3 (Target) every time there is a new parquet file in the source folder coming in from DMS CDC. However, after the first initial commit from the INSERT operation, the INSERT_OVERWRITE_TABLE operation completes in Deltastreamer and creates the .replacecommit file in the .hoodie folder at the target but when querying through Athena engine version 2 and spark-sql, I am seeing the count of records to be including records from both commits, instead of correctly seeing the records from the latest commit hence overwrite not working properly. **To Reproduce** Steps to reproduce the behavior: 1. Download any full and cdc parquet sample files ex: https://transer-files.s3.amazonaws.com/full.parquet https://transer-files.s3.amazonaws.com/cdc.parquet 3. Run DeltaStreamer Full and CDC: Make sure to update Source, Target, and jar locations. Below is what I am running: Full Deltastreamer config: spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --conf spark.shuffle.service.enabled=true --conf spark.shuffle.service.enabled=true --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.initialExecutors=1 --conf spark.dynamicAllocation.cachedExecutorIdleTimeout=60s --conf spark.dynamicAllocation.executorIdleTimeout=30s --conf spark.dynamicAllocation.schedulerBacklogTimeout=3s --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false --conf yarn.nodemanager.vmem-check-enabled=false --conf yarn.nodemanager.pmem-check-enabled=false --conf spark.kryoserializer.buffer.max=512m --conf spark.driver.memory=4g --conf spark.driver.memoryOverhead=1024 --conf spark.driver.maxResultSize=2g --conf spark.executor.memory=8g --conf spark.executor.memoryOverhead=2048 --conf spark.executor.cores=2 --conf spark.app.name=insert_overwrite_test_full --jars /usr/lib/spark/external/lib/spark-avro.jar /usr/lib/hudi/hudi-utilities-bundle.jar --table-type COPY_ON_WRITE --op INSERT --source-ordering-field seq_no --hoodie-conf hoodie.datasource.write.recordkey.field=ID1,ID2 --hoodie-conf hoodie.datasource.write.partitionpath.field= --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator --hoodie-conf hoodie.datasource.hive_sync.table=INSERT_OVERWRITE_TEST --hoodie-conf hoodie.datasource.hive_sync.enable=true --hoodie-conf hoodie.datasource.hive_sync.assume_date_partitioning=false --hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor --hoodie-conf hoodie.cleaner.commits.retained=10 --hoodie-conf hoodie.deltastreamer.transformer.sql=select 1==2 AS _hoodie_is_deleted, 'I' as Op,* from <SRC> --hoodie-conf hoodie.datasource.hive_sync.support_timestamp=false --target-base-path s3://<BUCKET_NAME>/POC/LANDING/INSERT_OVERWRITE_TEST --target-table insert_overwrite_test --transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer --enable-sync --hoodie-conf hoodie.deltastreamer.source.dfs.root=s3://<BUCKET_NAME>/POC/DMS/FULL/RECENT/TEST_FOLDER/TEST_SCHEMA/TEST_TABLE --source-class org.apache.hudi.utilities.sources.ParquetDFSSource CDC Deltastreamer config: spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --conf spark.shuffle.service.enabled=true --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.initialExecutors=1 --conf spark.dynamicAllocation.cachedExecutorIdleTimeout=60s --conf spark.dynamicAllocation.executorIdleTimeout=30s --conf spark.dynamicAllocation.schedulerBacklogTimeout=3s --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false --conf yarn.nodemanager.vmem-check-enabled=false --conf yarn.nodemanager.pmem-check-enabled=false --conf spark.kryoserializer.buffer.max=512m --conf spark.driver.memory=2g --conf spark.driver.memoryOverhead=512 --conf spark.executor.memory=3g --conf spark.executor.memoryOverhead=512 --conf spark.executor.cores=1 --conf spark.task.maxFailures=8 --conf spark.yarn.am.attemptFailuresValidityInterval=1h --conf spark.app.name=insert_overwrite_test_cdc --jars /usr/lib/spark/external/lib/spark-avro.jar /usr/lib/hudi/hudi-utilities-bundle.jar --table-type COPY_ON_WRITE --op INSERT_OVERWRITE_TABLE --source-ordering-field seq_no --hoodie-conf hoodie.datasource.write.recordkey.field=ID1,ID2 --hoodie-conf hoodie.datasource.write.partitionpath.field= --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator --hoodie-conf hoodie.datasource.hive_sync.table=INSERT_OVERWRITE_TEST --hoodie-conf hoodie.datasource.hive_sync.enable=true --hoodie-conf hoodie.datasource.hive_sync.assume_date_partitioning=false --hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor --hoodie-conf hoodie.parquet.small.file.limit=134217728 --hoodie-conf hoodie.parquet.max.file.size=268435456 --hoodie-conf hoodie.cleaner.commits.retained=10 --hoodie-conf hoodie.deltastreamer.transformer.sql=select CASE WHEN Op='D' THEN TRUE ELSE FALSE END AS _hoodie_is_deleted,* from <SRC> --hoodie-conf hoodie.datasource.hive_sync.support_timestamp=false --hoodie-conf hoodie.bloom.index.filter.type=DYNAMIC_V0 --hoodie-conf hoodie.upsert.shuffle.parallelism=25 --target-base-path s3://<BUCKET_NAME>/POC/LANDING/INSERT_OVERWRITE_TEST --target-table insert_overwrite_test --transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer --enable-sync --hoodie-conf hoodie.deltastreamer.source.dfs.root=s3://<BUCKET_NAME>/POC/DMS/CDC/RECENT/TEST_FOLDER/TEST_SCHEMA/TEST_TABLE --source-class org.apache.hudi.utilities.sources.ParquetDFSSource 4. Create table in Athena to read parquet files from respective S3 Location 5. Validate the output in Athena only includes records from CDC file. **Expected behavior** The entire table should be overwritten with the new records in the CDC file. Querying through Athena/spark-sql should return only the records from the latest commit. **Environment Description** * Hudi version : 0.12.2 on emr-6.9.0 * Spark version : 3.3.0 * Hive version : 3.1.3 * Hadoop version : 3.3.3 * Storage (HDFS/S3/GCS..) : S3 * Running on Docker? (yes/no) : No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
