pushpavanthar opened a new issue, #7757:
URL: https://github.com/apache/hudi/issues/7757

   **_Tips before filing an issue_**
   
   - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)? Yes
   
   - Join the mailing list to engage in conversations and get faster support at 
[email protected].
   
   - If you have triaged this as a bug, then file an 
[issue](https://issues.apache.org/jira/projects/HUDI/issues) directly. yes
   
   **Describe the problem you faced**
   I've beed running HoodieDeltaStreamer in continuous mode for COW tables in 
EMR for sometime. To validate the consistency of resultant table, I run a query 
comparing distinct primary keys created at each hour with the source table. I'm 
surprised to find some count of unique primary keys in the Hudi table. This 
implies that HoodieDeltaStreamer in continuous mode for COW tables, the data is 
not consistent. This issue reappears often in the different streams. Have 
captured driver logs for investigation. 
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Start EMR cluster with below configs
   <img width="532" alt="Screenshot 2023-01-26 at 8 04 18 PM" 
src="https://user-images.githubusercontent.com/8515997/214863624-7bea44ea-ec08-49bd-a273-6912ce74fb6c.png";>
   
   ```
   [{"classification":"hive-env", "properties":{}, 
"configurations":[{"classification":"export", 
"properties":{"HADOOP_HEAPSIZE":"5120"}, 
"configurations":[]}]},{"classification":"presto-connector-hive", 
"properties":{"hive.parquet.use-column-names":"true", 
"hive.s3.max-client-retries":"30", 
"hive.s3select-pushdown.max-connections":"6000", "hive.metastore":"glue", 
"hive.s3select-pushdown.enabled":"true", "hive.s3-file-system-type":"PRESTO"}, 
"configurations":[]},{"classification":"yarn-site", 
"properties":{"yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs":"600",
 "yarn.nodemanager.localizer.cache.target-size-mb":"10120", 
"yarn.nodemanager.localizer.cache.cleanup.interval-ms":"1500000"}, 
"configurations":[]},{"classification":"capacity-scheduler", 
"properties":{"yarn.scheduler.capacity.resource-calculator":"org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"},
 "configurations":[]},{"classification":"spark-defaults", 
"properties":{"spark.driver.memory":"
 4096M", "spark.history.fs.cleaner.maxAge":"1h", 
"spark.blacklist.decommissioning.timeout":"600s", "spark.port.maxRetries":"32", 
"spark.history.fs.cleaner.interval":"1h", 
"spark.history.fs.cleaner.enabled":"true"}, 
"configurations":[]},{"classification":"emrfs-site", 
"properties":{"fs.s3.maxRetries":"50"}, 
"configurations":[]},{"classification":"hive-site", 
"properties":{"hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"},
 "configurations":[]},{"classification":"spark-hive-site", 
"properties":{"hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"},
 "configurations":[]}]
   ```
   2.  Submit step to EMR to run deltastreamer as below
   ```
   spark-submit --master yarn \
   --jars 
/usr/lib/spark/external/lib/spark-avro.jar,s3://lake-bucket/jars/hudi-utilities-bundle_2.12-0.11.1.jar
 \
   --files s3://artifact_bucket/config/hudi/log4j.properties \
   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
   --conf spark.executor.cores=3 \
   --conf spark.driver.memory=4g \
   --conf spark.driver.memoryOverhead=1250m \
   --conf spark.executor.memoryOverhead=1250m \
   --conf spark.executor.memory=27g \
   --conf spark.dynamicAllocation.enabled=true \
   --conf spark.dynamicAllocation.initialExecutors=1 \
   --conf spark.dynamicAllocation.minExecutors=1 \
   --conf spark.dynamicAllocation.maxExecutors=3 \
   --conf spark.scheduler.mode=FAIR \
   --conf spark.task.maxFailures=5 \
   --conf spark.rdd.compress=true \
   --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
   --conf spark.shuffle.service.enabled=true \
   --conf spark.sql.hive.convertMetastoreParquet=false \
   --conf spark.yarn.max.executor.failures=5 \
   --conf spark.sql.catalogImplementation=hive \
   --conf spark.driver.cores=3 \
   --conf 
"spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j.properties 
-XX:+UseG1GC -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc 
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy 
-XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark 
-XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThreads=6 
-XX:ParallelGCThreads=12 -XX:G1HeapRegionSize=33554432 
-XX:G1HeapWastePercent=15 -XX:OnOutOfMemoryError='kill -9 %p'" \
   --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j.properties 
-XX:+UseG1GC -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc 
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy 
-XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark 
-XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThreads=6 
-XX:ParallelGCThreads=12 -XX:G1HeapRegionSize=33554432 
-XX:G1HeapWastePercent=15 -XX:OnOutOfMemoryError='kill -9 %p'" \
   --conf spark.app.name=cow_workflow_manager_service_kyc \
   --conf spark.driver.userClassPathFirst=true \
   --deploy-mode cluster s3://lake-bucket/jars/deltastreamer-addons-1.3.jar \
   --enable-sync \
   --hoodie-conf hoodie.deltastreamer.source.kafka.auto.reset.offsets=earliest \
   --hoodie-conf hoodie.parquet.compression.codec=snappy \
   --hoodie-conf 
partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor \
   --hoodie-conf hive.metastore.disallow.incompatible.col.type.changes=false \
   --hoodie-conf 
hoodie.deltastreamer.schemaprovider.spark_avro_post_processor.enable=false \
   --hoodie-conf auto.offset.reset=earliest \
   --hoodie-conf hoodie.clean.automatic=true \
   --hoodie-conf hoodie.clean.async=true \
   --hoodie-conf hoodie.clean.max.commits=30 \
   --hoodie-conf hoodie.table.services.enabled=true \
   --hoodie-conf hoodie.metadata.enable=false \
   --table-type COPY_ON_WRITE \
   --source-class com.domain.sources.ConfluentAvroKafkaSource \
   --schemaprovider-class 
org.apache.hudi.utilities.schema.NullTargetSchemaRegistryProvider \
   --props s3://artifact_bucket/config/kafka/kafka-source.properties \
   --source-limit 1000000 \
   --hoodie-conf 
hoodie.deltastreamer.schemaprovider.registry.url=https://schema-registry.in/subjects/workflow_manager_service.public.kyc-value/versions/latest
 \
   --hoodie-conf hoodie.datasource.hive_sync.database=workflow_manager_service \
   --hoodie-conf hoodie.datasource.hive_sync.table=kyc \
   --hoodie-conf hoodie.datasource.write.recordkey.field=id \
   --hoodie-conf hoodie.datasource.write.precombine.field=__lsn \
   --hoodie-conf 
hoodie.deltastreamer.source.kafka.topic=workflow_manager_service.public.kyc \
   --hoodie-conf group.id=cds-workflow_manager_service-kyc \
   --hoodie-conf hoodie.cleaner.policy=KEEP_LATEST_COMMITS \
   --hoodie-conf hoodie.keep.max.commits=6000 \
   --hoodie-conf hoodie.keep.min.commits=5000 \
   --hoodie-conf hoodie.cleaner.commits.retained=4500 \
   --hoodie-conf hoodie.cleaner.parallelism=500 \
   --hoodie-conf hoodie.clean.allow.multiple=false \
   --hoodie-conf hoodie.cleaner.incremental.mode=true \
   --hoodie-conf hoodie.archive.async=true \
   --hoodie-conf hoodie.archive.automatic=true \
   --hoodie-conf hoodie.archive.merge.files.batch.size=60 \
   --hoodie-conf hoodie.commits.archival.batch=30 \
   --hoodie-conf hoodie.archive.delete.parallelism=500 \
   --hoodie-conf hoodie.archive.merge.enable=true \
   --hoodie-conf hoodie.clustering.inline=false \
   --hoodie-conf hoodie.index.type=GLOBAL_BLOOM \
   --hoodie-conf hoodie.write.markers.type=DIRECT \
   --source-ordering-field __lsn \
   --target-base-path s3://lake-bucket/raw-data/workflow_manager_service/kyc \
   --target-table kyc \
   --payload-class com.domain.payload.PostgresSoftDeleteDebeziumAvroPayload \
   --hoodie-conf hoodie.bloom.index.update.partition.path=false \
   --hoodie-conf hoodie.metrics.on=true \
   --hoodie-conf hoodie.metrics.reporter.type=PROMETHEUS_PUSHGATEWAY \
   --hoodie-conf hoodie.metrics.pushgateway.host=pushgateway.in \
   --hoodie-conf hoodie.metrics.pushgateway.port=443 \
   --hoodie-conf hoodie.metrics.pushgateway.delete.on.shutdown=false \
   --hoodie-conf 
hoodie.metrics.pushgateway.job.name=hudi_continuous_workflow_manager_service_kyc_hudi
 \
   --hoodie-conf hoodie.metrics.pushgateway.random.job.name.suffix=false \
   --hoodie-conf hoodie.metrics.reporter.metricsname.prefix=hudi \
   --hoodie-conf hoodie.datasource.write.partitionpath.field='' \
   --hoodie-conf 
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator
 \
   --hoodie-conf 
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor
 \
   --transformer-class com.domain.transform.DebeziumTransformer \
   --hoodie-conf hoodie.deltastreamer.source.kafka.enable.commit.offset=true \
   --sync-tool-classes org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool \
   --continuous
   ```
   3. Keep it running for sometime maybe few days and keep comparing with below 
query. This confirms data missing in Hudi tables compared when compared with 
raw data(debezium output) retained into s3.
   <img width="651" alt="Screenshot 2023-01-26 at 8 40 41 PM" 
src="https://user-images.githubusercontent.com/8515997/214872442-4aa7eaeb-7a8e-4fd5-b23d-7222dc66eca5.png";>
   
   ```
   with raw_table as (
     select 'workflow_manager_service_public_kyc' as table_name, 
date_format(from_iso8601_timestamp(created_at), '%Y%m%d %H') as created_at,
       count(distinct id) as raw_count
     FROM raw_cdc_db.raw_workflow_manager_service_public_kyc
     where date_format(from_iso8601_timestamp(created_at), '%Y%m%d') >= 
date_format((NOW() - INTERVAL '3' DAY), '%Y%m%d')
       and date_format(from_iso8601_timestamp(created_at), '%Y%m%d %H') <= 
date_format((NOW() - INTERVAL '1' HOUR), '%Y%m%d %H')
       and dt >= date_format((NOW() - INTERVAL '7' DAY), '%Y%m%d')
     group by 1, 2
   ),
   hudi_table as (
     select 'workflow_manager_service_public_kyc' as table_name, 
date_format(from_iso8601_timestamp(created_at), '%Y%m%d %H') as p_date,
       count(1) as hudi_count
     from workflow_manager_service.kyc where 
date_format(from_iso8601_timestamp(created_at), '%Y%m%d') >= date_format((NOW() 
- INTERVAL '3' DAY), '%Y%m%d')
     and date_format(from_iso8601_timestamp(created_at), '%Y%m%d %H') <= 
date_format((NOW() - INTERVAL '1' HOUR), '%Y%m%d %H')
     group by 1, 2
   ),
   union_table as (
     select coalesce(l.table_name, r.table_name) as table_name, 
       coalesce(l.created_at, r.p_date) as create_at_dt,
       l.raw_count,
       r.hudi_count
     from raw_table l
       FULL OUTER JOIN hudi_table r on (l.created_at = r.p_date and 
l.table_name = r.table_name)
   )
   select *
   from union_table
   where raw_count is NULL or hudi_count is NULL
     or raw_count != hudi_count
   ```
   
   **Expected behavior**
   Hudi table should have exact number of records for each hour when compared 
with raw events. Hudi having less records compared to Raw table implies that 
data is missing from Hudi. Can someone help me where to start with to debug 
this issue?
   
   **Environment Description**
   
   * Hudi version : 0.11.1
   
   * Spark version : 3.1.1
   
   * Hive version : 3.1.2
   
   * Hadoop version : Amazon 3.2.1
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : No
   
   
   **Additional context**
   Driver logs are retained if it can be of some help for analysis.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to