ankitchandnani opened a new issue, #8672:
URL: https://github.com/apache/hudi/issues/8672

   **Describe the problem you faced**
   
   Hi Everyone,
   
   I am testing out hudi 0.12.2 on EMR Deltastreamer with version emr-6.9.0 to 
perform the INSERT_OVERWRITE_TABLE operation on a set of parquet files in S3 
(Source). I am trying to overwrite the entire table in S3 (Target) every time 
there is a new parquet file in the source folder coming in from DMS CDC. 
However, after the first initial commit from the INSERT operation, the 
INSERT_OVERWRITE_TABLE operation completes in Deltastreamer and creates the 
.replacecommit file in the .hoodie folder at the target but when querying 
through Athena engine version 2 and spark-sql, I am seeing the count of records 
to be including records from both commits, instead of correctly seeing the 
records from the latest commit hence overwrite not working properly. 
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Download any full and cdc parquet sample files
   
   ex: 
   https://transer-files.s3.amazonaws.com/full.parquet
   https://transer-files.s3.amazonaws.com/cdc.parquet
   
   3. Run DeltaStreamer Full and CDC: Make sure to update Source, Target, and 
jar locations. Below is what I am running:
   
   Full Deltastreamer config: 
   
   spark-submit --class 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer 
   --conf spark.shuffle.service.enabled=true 
   --conf spark.shuffle.service.enabled=true 
   --conf spark.dynamicAllocation.enabled=true 
   --conf spark.dynamicAllocation.initialExecutors=1 
   --conf spark.dynamicAllocation.cachedExecutorIdleTimeout=60s 
   --conf spark.dynamicAllocation.executorIdleTimeout=30s 
   --conf spark.dynamicAllocation.schedulerBacklogTimeout=3s 
   --conf spark.serializer=org.apache.spark.serializer.KryoSerializer 
   --conf spark.sql.hive.convertMetastoreParquet=false 
   --conf yarn.nodemanager.vmem-check-enabled=false 
   --conf yarn.nodemanager.pmem-check-enabled=false 
   --conf spark.kryoserializer.buffer.max=512m 
   --conf spark.driver.memory=4g 
   --conf spark.driver.memoryOverhead=1024 
   --conf spark.driver.maxResultSize=2g 
   --conf spark.executor.memory=8g 
   --conf spark.executor.memoryOverhead=2048 
   --conf spark.executor.cores=2 
   --conf spark.app.name=insert_overwrite_test_full 
   --jars /usr/lib/spark/external/lib/spark-avro.jar 
/usr/lib/hudi/hudi-utilities-bundle.jar 
   --table-type COPY_ON_WRITE 
   --op INSERT 
   --source-ordering-field seq_no 
   --hoodie-conf hoodie.datasource.write.recordkey.field=ID1,ID2
   --hoodie-conf hoodie.datasource.write.partitionpath.field=
   --hoodie-conf 
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator
   --hoodie-conf hoodie.datasource.hive_sync.table=INSERT_OVERWRITE_TEST
   --hoodie-conf hoodie.datasource.hive_sync.enable=true
   --hoodie-conf hoodie.datasource.hive_sync.assume_date_partitioning=false
   --hoodie-conf 
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor
   --hoodie-conf hoodie.cleaner.commits.retained=10
   --hoodie-conf hoodie.deltastreamer.transformer.sql=select 1==2 AS 
_hoodie_is_deleted, 'I' as Op,* from <SRC>
   --hoodie-conf hoodie.datasource.hive_sync.support_timestamp=false 
   --target-base-path s3://<BUCKET_NAME>/POC/LANDING/INSERT_OVERWRITE_TEST 
   --target-table insert_overwrite_test 
   --transformer-class 
org.apache.hudi.utilities.transform.SqlQueryBasedTransformer 
   --enable-sync 
   --hoodie-conf 
hoodie.deltastreamer.source.dfs.root=s3://<BUCKET_NAME>/POC/DMS/FULL/RECENT/TEST_FOLDER/TEST_SCHEMA/TEST_TABLE
 
   --source-class org.apache.hudi.utilities.sources.ParquetDFSSource
   
   CDC Deltastreamer config: 
   
   spark-submit --class 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer 
   --conf spark.shuffle.service.enabled=true 
   --conf spark.dynamicAllocation.enabled=true 
   --conf spark.dynamicAllocation.initialExecutors=1 
   --conf spark.dynamicAllocation.cachedExecutorIdleTimeout=60s 
   --conf spark.dynamicAllocation.executorIdleTimeout=30s 
   --conf spark.dynamicAllocation.schedulerBacklogTimeout=3s 
   --conf spark.serializer=org.apache.spark.serializer.KryoSerializer 
   --conf spark.sql.hive.convertMetastoreParquet=false 
   --conf yarn.nodemanager.vmem-check-enabled=false 
   --conf yarn.nodemanager.pmem-check-enabled=false 
   --conf spark.kryoserializer.buffer.max=512m 
   --conf spark.driver.memory=2g 
   --conf spark.driver.memoryOverhead=512 
   --conf spark.executor.memory=3g 
   --conf spark.executor.memoryOverhead=512 
   --conf spark.executor.cores=1 
   --conf spark.task.maxFailures=8 
   --conf spark.yarn.am.attemptFailuresValidityInterval=1h 
   --conf spark.app.name=insert_overwrite_test_cdc 
   --jars /usr/lib/spark/external/lib/spark-avro.jar 
/usr/lib/hudi/hudi-utilities-bundle.jar 
   --table-type COPY_ON_WRITE 
   --op INSERT_OVERWRITE_TABLE 
   --source-ordering-field seq_no 
   --hoodie-conf hoodie.datasource.write.recordkey.field=ID1,ID2
   --hoodie-conf hoodie.datasource.write.partitionpath.field=
   --hoodie-conf 
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator
   --hoodie-conf hoodie.datasource.hive_sync.table=INSERT_OVERWRITE_TEST
   --hoodie-conf hoodie.datasource.hive_sync.enable=true
   --hoodie-conf hoodie.datasource.hive_sync.assume_date_partitioning=false
   --hoodie-conf 
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor
   --hoodie-conf hoodie.parquet.small.file.limit=134217728
   --hoodie-conf hoodie.parquet.max.file.size=268435456
   --hoodie-conf hoodie.cleaner.commits.retained=10
   --hoodie-conf hoodie.deltastreamer.transformer.sql=select CASE WHEN Op='D' 
THEN TRUE ELSE FALSE END AS _hoodie_is_deleted,* from <SRC>
   --hoodie-conf hoodie.datasource.hive_sync.support_timestamp=false
   --hoodie-conf hoodie.bloom.index.filter.type=DYNAMIC_V0
   --hoodie-conf hoodie.upsert.shuffle.parallelism=25 
   --target-base-path s3://<BUCKET_NAME>/POC/LANDING/INSERT_OVERWRITE_TEST
   --target-table insert_overwrite_test 
   --transformer-class 
org.apache.hudi.utilities.transform.SqlQueryBasedTransformer 
   --enable-sync 
   --hoodie-conf 
hoodie.deltastreamer.source.dfs.root=s3://<BUCKET_NAME>/POC/DMS/CDC/RECENT/TEST_FOLDER/TEST_SCHEMA/TEST_TABLE
 
   --source-class org.apache.hudi.utilities.sources.ParquetDFSSource
   
   4. Create table in Athena to read parquet files from respective S3 Location
   5. Validate the output in Athena only includes records from CDC file. 
   
   **Expected behavior**
   
   The entire table should be overwritten with the new records in the CDC file. 
Querying through Athena/spark-sql should return only the records from the 
latest commit. 
   
   **Environment Description**
   
   * Hudi version : 0.12.2 on emr-6.9.0
   
   * Spark version : 3.3.0
   
   * Hive version : 3.1.3
   
   * Hadoop version : 3.3.3
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : No
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to