This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new fba5822  [HUDI-3430] Fix Deltastreamer to properly shut down the 
services upon failure (#4824)
fba5822 is described below

commit fba5822ee34167b07e65eefc63d7e3d43abd9387
Author: Y Ethan Guo <[email protected]>
AuthorDate: Fri Feb 18 05:44:56 2022 -0800

    [HUDI-3430] Fix Deltastreamer to properly shut down the services upon 
failure (#4824)
---
 .../apache/hudi/async/AsyncClusteringService.java  |  8 ++++++-
 .../org/apache/hudi/async/AsyncCompactService.java |  8 ++++++-
 .../org/apache/hudi/async/HoodieAsyncService.java  | 18 ++++++++++-----
 .../deltastreamer/HoodieDeltaStreamer.java         | 26 ++++++++++++++--------
 4 files changed, 44 insertions(+), 16 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java
index cce2ff5..1c1cf2b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java
@@ -82,10 +82,16 @@ public abstract class AsyncClusteringService extends 
HoodieAsyncTableService {
         }
         LOG.info("Clustering executor shutting down properly");
       } catch (InterruptedException ie) {
+        hasError = true;
         LOG.warn("Clustering executor got interrupted exception! Stopping", 
ie);
       } catch (IOException e) {
-        LOG.error("Clustering executor failed", e);
+        hasError = true;
+        LOG.error("Clustering executor failed due to IOException", e);
         throw new HoodieIOException(e.getMessage(), e);
+      } catch (Exception e) {
+        hasError = true;
+        LOG.error("Clustering executor failed", e);
+        throw e;
       }
       return true;
     }, executor)).toArray(CompletableFuture[]::new)), executor);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java
index 6bfaa2f..f1f7f41 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java
@@ -92,10 +92,16 @@ public abstract class AsyncCompactService extends 
HoodieAsyncTableService {
         }
         LOG.info("Compactor shutting down properly!!");
       } catch (InterruptedException ie) {
+        hasError = true;
         LOG.warn("Compactor executor thread got interrupted exception. 
Stopping", ie);
       } catch (IOException e) {
-        LOG.error("Compactor executor failed", e);
+        hasError = true;
+        LOG.error("Compactor executor failed due to IOException", e);
         throw new HoodieIOException(e.getMessage(), e);
+      } catch (Exception e) {
+        hasError = true;
+        LOG.error("Compactor executor failed", e);
+        throw e;
       }
       return true;
     }, executor)).toArray(CompletableFuture[]::new)), executor);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java
index a1665c7..1ce6dfb 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java
@@ -41,7 +41,10 @@ import java.util.function.Function;
 public abstract class HoodieAsyncService implements Serializable {
 
   private static final Logger LOG = 
LogManager.getLogger(HoodieAsyncService.class);
+  private static final long POLLING_SECONDS = 10;
 
+  // Flag indicating whether an error is incurred in the service
+  protected boolean hasError;
   // Flag to track if the service is started.
   private boolean started;
   // Flag indicating shutdown is externally requested
@@ -82,9 +85,13 @@ public abstract class HoodieAsyncService implements 
Serializable {
     return shutdown;
   }
 
+  public boolean hasError() {
+    return hasError;
+  }
+
   /**
    * Wait till the service shutdown. If the service shutdown with exception, 
it will be thrown
-   * 
+   *
    * @throws ExecutionException
    * @throws InterruptedException
    */
@@ -109,6 +116,7 @@ public abstract class HoodieAsyncService implements 
Serializable {
   public void shutdown(boolean force) {
     if (!shutdownRequested || force) {
       shutdownRequested = true;
+      shutdown = true;
       if (executor != null) {
         if (force) {
           executor.shutdownNow();
@@ -178,8 +186,8 @@ public abstract class HoodieAsyncService implements 
Serializable {
   public void waitTillPendingAsyncServiceInstantsReducesTo(int numPending) 
throws InterruptedException {
     try {
       queueLock.lock();
-      while (!isShutdown() && (pendingInstants.size() > numPending)) {
-        consumed.await();
+      while (!isShutdown() && !hasError() && (pendingInstants.size() > 
numPending)) {
+        consumed.await(POLLING_SECONDS, TimeUnit.SECONDS);
       }
     } finally {
       queueLock.unlock();
@@ -202,8 +210,8 @@ public abstract class HoodieAsyncService implements 
Serializable {
    * @throws InterruptedException
    */
   HoodieInstant fetchNextAsyncServiceInstant() throws InterruptedException {
-    LOG.info("Waiting for next instant upto 10 seconds");
-    HoodieInstant instant = pendingInstants.poll(10, TimeUnit.SECONDS);
+    LOG.info(String.format("Waiting for next instant up to %d seconds", 
POLLING_SECONDS));
+    HoodieInstant instant = pendingInstants.poll(POLLING_SECONDS, 
TimeUnit.SECONDS);
     if (instant != null) {
       try {
         queueLock.lock();
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index 65cf2c3..c0c141d 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -659,6 +659,10 @@ public class HoodieDeltaStreamer implements Serializable {
                 
asyncCompactService.get().enqueuePendingAsyncServiceInstant(new 
HoodieInstant(State.REQUESTED,
                     HoodieTimeline.COMPACTION_ACTION, 
scheduledCompactionInstantAndRDD.get().getLeft().get()));
                 
asyncCompactService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingCompactions);
+                if (asyncCompactService.get().hasError()) {
+                  error = true;
+                  throw new HoodieException("Async compaction failed.  
Shutting down Delta Sync...");
+                }
               }
               if (clusteringConfig.isAsyncClusteringEnabled()) {
                 Option<String> clusteringInstant = 
deltaSync.getClusteringInstantOpt();
@@ -666,6 +670,10 @@ public class HoodieDeltaStreamer implements Serializable {
                   LOG.info("Scheduled async clustering for instant: " + 
clusteringInstant.get());
                   
asyncClusteringService.get().enqueuePendingAsyncServiceInstant(new 
HoodieInstant(State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, 
clusteringInstant.get()));
                   
asyncClusteringService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingClustering);
+                  if (asyncClusteringService.get().hasError()) {
+                    error = true;
+                    throw new HoodieException("Async clustering failed.  
Shutting down Delta Sync...");
+                  }
                 }
               }
               long toSleepMs = cfg.minSyncIntervalSeconds * 1000 - 
(System.currentTimeMillis() - start);
@@ -684,6 +692,7 @@ public class HoodieDeltaStreamer implements Serializable {
           }
         } finally {
           shutdownAsyncServices(error);
+          executor.shutdownNow();
         }
         return true;
       }, executor), executor);
@@ -737,13 +746,12 @@ public class HoodieDeltaStreamer implements Serializable {
               HoodieTableMetaClient.builder().setConf(new 
Configuration(jssc.hadoopConfiguration())).setBasePath(cfg.targetBasePath).setLoadActiveTimelineOnLoad(true).build();
           List<HoodieInstant> pending = 
CompactionUtils.getPendingCompactionInstantTimes(meta);
           pending.forEach(hoodieInstant -> 
asyncCompactService.get().enqueuePendingAsyncServiceInstant(hoodieInstant));
-          asyncCompactService.get().start((error) -> {
-            // Shutdown DeltaSync
-            shutdown(false);
-            return true;
-          });
+          asyncCompactService.get().start(error -> true);
           try {
             
asyncCompactService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingCompactions);
+            if (asyncCompactService.get().hasError()) {
+              throw new HoodieException("Async compaction failed during write 
client initialization.");
+            }
           } catch (InterruptedException ie) {
             throw new HoodieException(ie);
           }
@@ -762,12 +770,12 @@ public class HoodieDeltaStreamer implements Serializable {
           List<HoodieInstant> pending = 
ClusteringUtils.getPendingClusteringInstantTimes(meta);
           LOG.info(String.format("Found %d pending clustering instants ", 
pending.size()));
           pending.forEach(hoodieInstant -> 
asyncClusteringService.get().enqueuePendingAsyncServiceInstant(hoodieInstant));
-          asyncClusteringService.get().start((error) -> {
-            shutdown(false);
-            return true;
-          });
+          asyncClusteringService.get().start(error -> true);
           try {
             
asyncClusteringService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingClustering);
+            if (asyncClusteringService.get().hasError()) {
+              throw new HoodieException("Async clustering failed during write 
client initialization.");
+            }
           } catch (InterruptedException e) {
             throw new HoodieException(e);
           }

Reply via email to