This is an automated email from the ASF dual-hosted git repository.
jeagles pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new 1defedf TEZ-4147. Reduce NN calls in
RecoveryService::handleRecoveryEvent
1defedf is described below
commit 1defedfc4550d7bea47bd1d205a98c4860d21c15
Author: László Bodor <[email protected]>
AuthorDate: Fri Apr 17 14:41:29 2020 -0500
TEZ-4147. Reduce NN calls in RecoveryService::handleRecoveryEvent
Signed-off-by: Jonathan Eagles <[email protected]>
(cherry picked from commit 049511708a6735201e19e3af4575dba3a51743eb)
---
.../tez/dag/history/recovery/RecoveryService.java | 23 +++++++++++-----------
1 file changed, 11 insertions(+), 12 deletions(-)
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
index d874e0a..a0a152c 100644
---
a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
+++
b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
@@ -415,10 +415,10 @@ public class RecoveryService extends AbstractService {
if (LOG.isDebugEnabled()) {
LOG.debug("AppId :" + appContext.getApplicationID() + " summaryPath "
+ summaryPath);
}
- if (!recoveryDirFS.exists(summaryPath)) {
- summaryStream = recoveryDirFS.create(summaryPath, false,
- bufferSize);
- } else {
+ try {
+ summaryStream = recoveryDirFS.create(summaryPath, false, bufferSize);
+ } catch (IOException e) {
+ LOG.error("Error handling summary event, eventType=" + eventType, e);
createFatalErrorFlagDir();
return;
}
@@ -456,16 +456,15 @@ public class RecoveryService extends AbstractService {
RecoveryStream recoveryStream = outputStreamMap.get(dagID);
if (recoveryStream == null) {
Path dagFilePath = TezCommonUtils.getDAGRecoveryPath(recoveryPath,
dagID.toString());
- if (recoveryDirFS.exists(dagFilePath)) {
- createFatalErrorFlagDir();
- return;
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Opening DAG recovery file in create mode"
- + ", filePath=" + dagFilePath);
- }
+
+ try {
FSDataOutputStream outputStream = recoveryDirFS.create(dagFilePath,
false, bufferSize);
+ LOG.debug("Opened DAG recovery file in create mode, filePath={}",
dagFilePath);
recoveryStream = new RecoveryStream(outputStream);
+ } catch (IOException ioe) {
+ LOG.error("Error handling history event, eventType=" + eventType, ioe);
+ createFatalErrorFlagDir();
+ return;
}
outputStreamMap.put(dagID, recoveryStream);
}