This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new fba5822 [HUDI-3430] Fix Deltastreamer to properly shut down the
services upon failure (#4824)
fba5822 is described below
commit fba5822ee34167b07e65eefc63d7e3d43abd9387
Author: Y Ethan Guo <[email protected]>
AuthorDate: Fri Feb 18 05:44:56 2022 -0800
[HUDI-3430] Fix Deltastreamer to properly shut down the services upon
failure (#4824)
---
.../apache/hudi/async/AsyncClusteringService.java | 8 ++++++-
.../org/apache/hudi/async/AsyncCompactService.java | 8 ++++++-
.../org/apache/hudi/async/HoodieAsyncService.java | 18 ++++++++++-----
.../deltastreamer/HoodieDeltaStreamer.java | 26 ++++++++++++++--------
4 files changed, 44 insertions(+), 16 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java
index cce2ff5..1c1cf2b 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java
@@ -82,10 +82,16 @@ public abstract class AsyncClusteringService extends
HoodieAsyncTableService {
}
LOG.info("Clustering executor shutting down properly");
} catch (InterruptedException ie) {
+ hasError = true;
LOG.warn("Clustering executor got interrupted exception! Stopping",
ie);
} catch (IOException e) {
- LOG.error("Clustering executor failed", e);
+ hasError = true;
+ LOG.error("Clustering executor failed due to IOException", e);
throw new HoodieIOException(e.getMessage(), e);
+ } catch (Exception e) {
+ hasError = true;
+ LOG.error("Clustering executor failed", e);
+ throw e;
}
return true;
}, executor)).toArray(CompletableFuture[]::new)), executor);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java
index 6bfaa2f..f1f7f41 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java
@@ -92,10 +92,16 @@ public abstract class AsyncCompactService extends
HoodieAsyncTableService {
}
LOG.info("Compactor shutting down properly!!");
} catch (InterruptedException ie) {
+ hasError = true;
LOG.warn("Compactor executor thread got interrupted exception.
Stopping", ie);
} catch (IOException e) {
- LOG.error("Compactor executor failed", e);
+ hasError = true;
+ LOG.error("Compactor executor failed due to IOException", e);
throw new HoodieIOException(e.getMessage(), e);
+ } catch (Exception e) {
+ hasError = true;
+ LOG.error("Compactor executor failed", e);
+ throw e;
}
return true;
}, executor)).toArray(CompletableFuture[]::new)), executor);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java
index a1665c7..1ce6dfb 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java
@@ -41,7 +41,10 @@ import java.util.function.Function;
public abstract class HoodieAsyncService implements Serializable {
private static final Logger LOG =
LogManager.getLogger(HoodieAsyncService.class);
+ private static final long POLLING_SECONDS = 10;
+ // Flag indicating whether an error is incurred in the service
+ protected boolean hasError;
// Flag to track if the service is started.
private boolean started;
// Flag indicating shutdown is externally requested
@@ -82,9 +85,13 @@ public abstract class HoodieAsyncService implements
Serializable {
return shutdown;
}
+ public boolean hasError() {
+ return hasError;
+ }
+
/**
* Wait till the service shutdown. If the service shutdown with exception,
it will be thrown
- *
+ *
* @throws ExecutionException
* @throws InterruptedException
*/
@@ -109,6 +116,7 @@ public abstract class HoodieAsyncService implements
Serializable {
public void shutdown(boolean force) {
if (!shutdownRequested || force) {
shutdownRequested = true;
+ shutdown = true;
if (executor != null) {
if (force) {
executor.shutdownNow();
@@ -178,8 +186,8 @@ public abstract class HoodieAsyncService implements
Serializable {
public void waitTillPendingAsyncServiceInstantsReducesTo(int numPending)
throws InterruptedException {
try {
queueLock.lock();
- while (!isShutdown() && (pendingInstants.size() > numPending)) {
- consumed.await();
+ while (!isShutdown() && !hasError() && (pendingInstants.size() >
numPending)) {
+ consumed.await(POLLING_SECONDS, TimeUnit.SECONDS);
}
} finally {
queueLock.unlock();
@@ -202,8 +210,8 @@ public abstract class HoodieAsyncService implements
Serializable {
* @throws InterruptedException
*/
HoodieInstant fetchNextAsyncServiceInstant() throws InterruptedException {
- LOG.info("Waiting for next instant upto 10 seconds");
- HoodieInstant instant = pendingInstants.poll(10, TimeUnit.SECONDS);
+ LOG.info(String.format("Waiting for next instant up to %d seconds",
POLLING_SECONDS));
+ HoodieInstant instant = pendingInstants.poll(POLLING_SECONDS,
TimeUnit.SECONDS);
if (instant != null) {
try {
queueLock.lock();
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index 65cf2c3..c0c141d 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -659,6 +659,10 @@ public class HoodieDeltaStreamer implements Serializable {
asyncCompactService.get().enqueuePendingAsyncServiceInstant(new
HoodieInstant(State.REQUESTED,
HoodieTimeline.COMPACTION_ACTION,
scheduledCompactionInstantAndRDD.get().getLeft().get()));
asyncCompactService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingCompactions);
+ if (asyncCompactService.get().hasError()) {
+ error = true;
+ throw new HoodieException("Async compaction failed.
Shutting down Delta Sync...");
+ }
}
if (clusteringConfig.isAsyncClusteringEnabled()) {
Option<String> clusteringInstant =
deltaSync.getClusteringInstantOpt();
@@ -666,6 +670,10 @@ public class HoodieDeltaStreamer implements Serializable {
LOG.info("Scheduled async clustering for instant: " +
clusteringInstant.get());
asyncClusteringService.get().enqueuePendingAsyncServiceInstant(new
HoodieInstant(State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION,
clusteringInstant.get()));
asyncClusteringService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingClustering);
+ if (asyncClusteringService.get().hasError()) {
+ error = true;
+ throw new HoodieException("Async clustering failed.
Shutting down Delta Sync...");
+ }
}
}
long toSleepMs = cfg.minSyncIntervalSeconds * 1000 -
(System.currentTimeMillis() - start);
@@ -684,6 +692,7 @@ public class HoodieDeltaStreamer implements Serializable {
}
} finally {
shutdownAsyncServices(error);
+ executor.shutdownNow();
}
return true;
}, executor), executor);
@@ -737,13 +746,12 @@ public class HoodieDeltaStreamer implements Serializable {
HoodieTableMetaClient.builder().setConf(new
Configuration(jssc.hadoopConfiguration())).setBasePath(cfg.targetBasePath).setLoadActiveTimelineOnLoad(true).build();
List<HoodieInstant> pending =
CompactionUtils.getPendingCompactionInstantTimes(meta);
pending.forEach(hoodieInstant ->
asyncCompactService.get().enqueuePendingAsyncServiceInstant(hoodieInstant));
- asyncCompactService.get().start((error) -> {
- // Shutdown DeltaSync
- shutdown(false);
- return true;
- });
+ asyncCompactService.get().start(error -> true);
try {
asyncCompactService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingCompactions);
+ if (asyncCompactService.get().hasError()) {
+ throw new HoodieException("Async compaction failed during write
client initialization.");
+ }
} catch (InterruptedException ie) {
throw new HoodieException(ie);
}
@@ -762,12 +770,12 @@ public class HoodieDeltaStreamer implements Serializable {
List<HoodieInstant> pending =
ClusteringUtils.getPendingClusteringInstantTimes(meta);
LOG.info(String.format("Found %d pending clustering instants ",
pending.size()));
pending.forEach(hoodieInstant ->
asyncClusteringService.get().enqueuePendingAsyncServiceInstant(hoodieInstant));
- asyncClusteringService.get().start((error) -> {
- shutdown(false);
- return true;
- });
+ asyncClusteringService.get().start(error -> true);
try {
asyncClusteringService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingClustering);
+ if (asyncClusteringService.get().hasError()) {
+ throw new HoodieException("Async clustering failed during write
client initialization.");
+ }
} catch (InterruptedException e) {
throw new HoodieException(e);
}