sydneyhoran opened a new issue, #8146:
URL: https://github.com/apache/hudi/issues/8146

   **_Tips before filing an issue_**
   
   - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)?
   
   - Join the mailing list to engage in conversations and get faster support at 
[email protected].
   
   - If you have triaged this as a bug, then file an 
[issue](https://issues.apache.org/jira/projects/HUDI/issues) directly.
   
   **Describe the problem you faced**
   
   Originally my problem was very similar to this [Github 
Issue](https://github.com/apache/hudi/issues/6316), where the 
MultiTableDeltaStreamer was not moving onto the next table unless there was an 
error. In my fork and current jar, I have [enabled 
post-write-termination-strategy](https://github.com/sydneyhoran/hudi/blob/20f182d82e020ecd30fc1546ea0a4a6116276195/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java#L405)
 config for MultiTableDeltaStreamer, so it can move on to the next table in 
MultiTableDeltaStreamer in `--continuous` mode.
   
   After N number of retries with no new data 
(`max.rounds.without.new.data.to.shutdown`=N), it will move to the next table 
and eventually wrap up the Spark job instead of being stuck on the first one 
until there is an error. In this branch I also removed `error = true;` in this 
block as I didn't want NoNewData to be treated as an error.
   
   However, it seems that the TerminationStrategy triggers the ingestion thread 
to shut down the Delta Sync thread but the async threads continue, and this 
leaves the spark session running due to background tasks (e.g. async cleaner).
   
   I believe there should be an additional shutdown for async services in the 
PostWriteTerminationStrategy.shouldShutdown() block such as in my fork at [this 
line](https://github.com/sydneyhoran/hudi/blob/bde3719226bade5bce204cdc0d16fb3874123e0d/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java#L744).
 Otherwise I don't believe async services will be shutdown or not gracefully 
(i.e. sleep interrupted Exception).
   
   ```java
   // check if deltastreamer need to be shutdown
   if (postWriteTerminationStrategy.isPresent()) {
     if 
(postWriteTerminationStrategy.get().shouldShutdown(scheduledCompactionInstantAndRDD.isPresent()
 ? Option.of(scheduledCompactionInstantAndRDD.get().getRight()) :
       Option.empty())) {
       LOG.info("Shutting down Delta Sync due to 
postWriteTerminationStrategy");  // OP: I added this log line for visibility
       shutdownAsyncServices(error); // OP: This is the shutdown I added
       shutdown(false);
     }
   }
   ```
   
   The jobs are now running as expected and consuming MultiTables and ending 
the spark job based on the jar created from my fork. If there is another 
recommended solution that would be better I'm definitely able to test it out!
   
   These two code changes (termination strategy for MultiTable and Async 
services shutdown after Strategy is triggered) could be a solution for the 
following issues that haven't been merged in:
   
   - https://github.com/apache/hudi/issues/6316
   - https://github.com/apache/hudi/pull/5071
   
   Open to suggestions and conversation on this topic. Again thanks for all you 
do!
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Modify 
[HoodieMultiTableDeltaStreamer.java](https://github.com/sydneyhoran/hudi/blob/20f182d82e020ecd30fc1546ea0a4a6116276195/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java#L405)
 to accept `-post-write-termination-strategy-class` as a config
   2. Run HoodieMultiTableDeltaStreamer in `--continuous` mode with 
`--post-write-termination-strategy-class 
org.apache.hudi.utilities.deltastreamer.NoNewDataTerminationStrategy` and 
`hoodie.clean.async=true`
   3. Consume from a topic/source and reach the end of the new data
   4. Logs will show no new data, says `Delta Sync shutting down` but the Spark 
job remains in running mode with no further log messages, requiring manual 
application shutdown
   
   **Expected behavior**
   
   Spark job should end all threads and fully shut down after 
PostWriteTerminationStrategy is reached. We need the Spark job to be marked as 
ended to release the cluster resources. Also for orchestration jobs to know 
that the job is no longer running and may need to be kicked off again based on 
business logic for data SLA requirements.
   
   **Environment Description**
   
   * Hudi version : 0.13.0
   
   * Spark version : 3.1
   
   * Hive version : N/A
   
   * Hadoop version : N/A
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : No
   
   
   **Additional context**
   
   Running DeltaStreamer on Google Dataproc on GKE.
   
   **Stacktrace**
   
   Below is the logging at the end of the spark job, after all MultiTables have 
consumed their topics. However the Spark job remained alive ("Running") and 
cluster resources were still consumed for minutes/hours after this log. I had 
to manually kill the application to get it to end.
   
   ```sh
   23/03/08 21:10:55 INFO 
org.apache.hudi.utilities.deltastreamer.NoNewDataTerminationStrategy: Shutting 
down on continuous mode as there is no new data for 5
   23/03/08 21:10:55 INFO org.apache.hudi.async.HoodieAsyncService: Shutting 
down executor
   23/03/08 21:11:05 INFO 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer: Last sync ran less 
than min sync interval: 20 s, sleep: 9192 ms.
   23/03/08 21:11:14 INFO 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer: Delta Sync 
shutdown. Error ?true
   23/03/08 21:11:14 INFO 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer: DeltaSync 
shutdown. Closing write client. Error?false
   23/03/08 21:11:14 INFO 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer: Delta Sync 
shutting down
   23/03/08 21:11:14 INFO org.apache.hudi.utilities.deltastreamer.DeltaSync: 
Shutting down embedded timeline server
   23/03/08 21:11:14 INFO 
org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer: 
Ingestion was successful for topics: [{Redacted list of table names}]
   23/03/08 21:11:14 INFO org.sparkproject.jetty.server.AbstractConnector: 
Stopped Spark@10dc7d6{HTTP/1.1, (http/1.1)}{0.0.0.0:8090}
   23/03/08 21:11:14 WARN 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsWatchSnapshotSource: 
Kubernetes client has been closed (this is expected if the application is 
shutting down.)
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to