This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push:
new 26759e0 TEZ-4273: Clear off staging files when TezYarnClient is
unable to submit applications (Rajesh Balamohan via László Bodor)
26759e0 is described below
commit 26759e06bd3a9317fe439d7ad81b60e1dabc932e
Author: Rajesh Balamohan <[email protected]>
AuthorDate: Mon Feb 8 09:44:22 2021 +0100
TEZ-4273: Clear off staging files when TezYarnClient is unable to submit
applications (Rajesh Balamohan via László Bodor)
Signed-off-by: Laszlo Bodor <[email protected]>
---
.../main/java/org/apache/tez/client/TezClient.java | 24 ++++++++++++++++++++++
1 file changed, 24 insertions(+)
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index da213b8..378017b 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -33,6 +33,7 @@ import java.util.Objects;
import javax.annotation.Nullable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -403,6 +404,10 @@ public class TezClient {
LOG.info("The url to track the Tez Session: " +
appReport.getTrackingUrl());
sessionStarted.set(true);
} catch (YarnException e) {
+ cleanStagingDir();
+ throw new TezException(e);
+ } catch (IOException e) {
+ cleanStagingDir();
throw new TezException(e);
}
@@ -411,6 +416,21 @@ public class TezClient {
}
}
+ private void cleanStagingDir() {
+ Configuration conf = amConfig.getTezConfiguration();
+ String appId = sessionAppId.toString();
+ Path stagingDir = TezCommonUtils.getTezSystemStagingPath(conf, appId);
+ boolean isStgDeleted = false;
+ try {
+ FileSystem fs = stagingDir.getFileSystem(conf);
+ isStgDeleted = fs.delete(stagingDir, true);
+ } catch (IOException ioe) {
+ LOG.error("Error deleting staging dir " + stagingDir, ioe);
+ } finally {
+ LOG.info("Staging dir {}, deleted:{} ", stagingDir, isStgDeleted);
+ }
+ }
+
public synchronized TezClient getClient(String appIdStr) throws IOException,
TezException {
return getClient(appIdfromString(appIdStr));
}
@@ -450,6 +470,10 @@ public class TezClient {
LOG.info("The url to track the Tez Session: " +
appReport.getTrackingUrl());
sessionStarted.set(true);
} catch (YarnException e) {
+ cleanStagingDir();
+ throw new TezException(e);
+ } catch (IOException e) {
+ cleanStagingDir();
throw new TezException(e);
}