fenil25 opened a new issue, #9808:
URL: https://github.com/apache/hudi/issues/9808

   **_Tips before filing an issue_**
   
   - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)? Yes
   
   - Join the mailing list to engage in conversations and get faster support at 
[email protected].
   
   - If you have triaged this as a bug, then file an 
[issue](https://issues.apache.org/jira/projects/HUDI/issues) directly.
   
   **Describe the problem you faced**
   
   Our pipeline involves ingesting the changelog from Kafka into Flink and then 
we finally use the Hudi sink. We are observing a lot of issues with few tables. 
The table emits around 1-5K events per second.
   We are using EMR 6.11.0 with Hudi version 13.0 and Flink version 1.16.0
   Initially, we tried Copy On Write (COW) but then saw lot of issues with 
checkpointing in Flink. The main culprit was the error -
   `Checkpoint expired before completing`.
   Increasing resources, decreasing checkpoint interval, increasing checkpoint 
timeout nothing helped.
   We then moved to Merge On Read table. We were still seeing issues like -
   `java.lang.IllegalStateException: Receive an unexpected event for instant 
20230912181658265 from task 7`
   Open hudi issue like [this](https://github.com/apache/hudi/issues/5690) 
suggest that it's multiple writer issue. We set the writers to 1 to resolve 
this and then increasing checkpoint timeout from 15 minutes to 60 minutes 
helped.
   The checkpoint was taking around 30 minutes.
   However, the main problem we faced here was with whatever config we change 
for the Flink pipeline, we had to rebootstrap the table. This was not the case 
with CoW table. Bootstrapping is expensive for us and takes quite some time. If 
we do not bootstrap, then we see the error of IllegalStateException again or 
FileAlreadyExistsException (for log files)
   Our main questions are -
   - Is there any way where we can circumvent bootstrapping for flink config 
changes like changing checkpoint interval etc.
   - For such a huge pipeline, how do we parallelize this with multiple writers 
without running into aforementioned 
IllegalStateException/FileAlreadyExistsException problem.
   - What better configs should we set to increase the efficiency of the 
ingestion?
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. On EMR, setup an ingestion from MySQL Debezium changelogs emitted to 
Kafka and finally consumed by Flink to create a Hudi table
   2. The scale of changelogs should be around 5K events/sec 
   3. Hudi/Flink configs 
   - MOR table
   - Single writer 
   - Checkpoint configuration - 
   <img width="1480" alt="image" 
src="https://github.com/apache/hudi/assets/32615205/c2594d8e-3a5d-47b2-a9f5-3777a5e725b3";>
   - hudi.clean.policy=KEEP_LATEST_BY_HOURS
   - --hudi.clean.retain_hours=168
   - All other configs are just default configurations 
   
   **Environment Description**
   
   * Hudi version : 13.0 
   
   * Flink version : 1.16.0
   
   * Hive version :
   
   * Hadoop version :
   
   * Storage (HDFS/S3/GCS..) : S3 
   
   * Running on Docker? (yes/no) : no 
   
   
   **Stacktrace**
   
   Stack trace of the aforementioned errors - 
   - FileAlreadyExistsException 
   ```
   org.apache.hudi.exception.HoodieRollbackException: Failed to rollback 
s3://slack-dw/hudi/channels_members commits 20230912181658265 
   ...
   Caused by: org.apache.hudi.exception.HoodieException: Error occurs when 
executing flatMap
   ...
   Caused by: org.apache.hudi.exception.HoodieRollbackException: Failed to 
rollback for instant [==>20230912181658265__deltacommit__INFLIGHT]
   ...
   Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: File already 
exists:s3://slack-dw/hudi/channels_members/.108ba95b-d395-4b98-826d-9e8e0b26d8e1-1_20230912004700323.log.1247_1-0-1
   ...
   2023-09-13 19:50:06,654 INFO  org.apache.flink.runtime.jobmaster.JobMaster   
              [] - Trying to recover from a global failure.
   org.apache.flink.util.FlinkException: Global failure triggered by 
OperatorCoordinator for 'stream_write: channels_members' (operator 
cd7a24f114e59daa8b573b2d08871155).
        at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:617)
 ~[flink-dist-1.16.0.jar:1.16.0] 
   ```
   
   - IllegalStateException
   ```
   org.apache.flink.util.FlinkException: Global failure triggered by 
OperatorCoordinator for 'stream_write: channels_members' (operator 
cd7a24f114e59daa8b573b2d08871155).
        at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:617)
        at 
org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$start$0(StreamWriteOperatorCoordinator.java:190)
        at 
org.apache.hudi.sink.utils.NonThrownExecutor.handleException(NonThrownExecutor.java:142)
        at 
org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:133)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
   Caused by: org.apache.hudi.exception.HoodieException: Executor executes 
action [handle write metadata event for instant ] error
        ... 6 more
   Caused by: java.lang.IllegalStateException: Receive an unexpected event for 
instant 20230912181658265 from task 7
        at 
org.apache.hudi.common.util.ValidationUtils.checkState(ValidationUtils.java:67)
        at 
org.apache.hudi.sink.StreamWriteOperatorCoordinator.handleWriteMetaEvent(StreamWriteOperatorCoordinator.java:447)
        at 
org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$handleEventFromOperator$4(StreamWriteOperatorCoordinator.java:288)
        at 
org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130)
        ... 3 more
   ```
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to