Repository: incubator-apex-core Updated Branches: refs/heads/master 5e0d4b758 -> 52c16418e
APEXCORE-359 #resolve added clean-app-directories command to clean up terminated apps Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/ee857e5c Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/ee857e5c Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/ee857e5c Branch: refs/heads/master Commit: ee857e5c6843e1cfb290e7d0dea95c32edd3cfff Parents: 5e0d4b7 Author: David Yan <[email protected]> Authored: Thu Feb 25 13:53:07 2016 -0800 Committer: David Yan <[email protected]> Committed: Thu Feb 25 13:58:48 2016 -0800 ---------------------------------------------------------------------- .../java/com/datatorrent/stram/cli/DTCli.java | 21 +++++++++++++++ .../stram/client/StramClientUtils.java | 28 ++++++++++++++++++++ 2 files changed, 49 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/ee857e5c/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java b/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java index 16461ae..252587b 100644 --- a/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java +++ b/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java @@ -647,6 +647,10 @@ public class DTCli null, null, "Lists the default operator attributes")); globalCommands.put("list-default-port-attributes", new CommandSpec(new ListDefaultAttributesCommand(AttributesType.PORT), null, null, "Lists the default port attributes")); + globalCommands.put("clean-app-directories", new CommandSpec(new CleanAppDirectoriesCommand(), + new Arg[]{new Arg("duration-in-millis")}, + null, + "Clean up data directories of applications that terminated the given milliseconds ago")); // // Connected command specification starts here @@ -3834,6 +3838,23 @@ public class DTCli } } + private class CleanAppDirectoriesCommand implements Command + { + @Override + public void execute(String[] args, ConsoleReader reader) throws Exception + { + JSONObject result = new JSONObject(); + JSONArray appArray = new JSONArray(); + List<ApplicationReport> apps = StramClientUtils.cleanAppDirectories(yarnClient, conf, fs, + System.currentTimeMillis() - Long.valueOf(args[1])); + for (ApplicationReport app : apps) { + appArray.put(app.getApplicationId().toString()); + } + result.put("applications", appArray); + printJson(result); + } + } + @SuppressWarnings("static-access") public static class GetPhysicalPropertiesCommandLineOptions { http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/ee857e5c/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java index 4c1227a..af64a45 100644 --- a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java +++ b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java @@ -814,4 +814,32 @@ public class StramClientUtils return rmAddresses; } + public static List<ApplicationReport> cleanAppDirectories(YarnClient clientRMService, Configuration conf, FileSystem fs, long finishedBefore) + throws IOException, YarnException + { + List<ApplicationReport> result = new ArrayList<>(); + List<ApplicationReport> applications = clientRMService.getApplications(Sets.newHashSet(StramClient.YARN_APPLICATION_TYPE), + EnumSet.of(YarnApplicationState.FAILED, + YarnApplicationState.FINISHED, + YarnApplicationState.KILLED)); + Path appsBasePath = new Path(StramClientUtils.getDTDFSRootDir(fs, conf), StramClientUtils.SUBDIR_APPS); + for (ApplicationReport ar : applications) { + long finishTime = ar.getFinishTime(); + if (finishTime < finishedBefore) { + try { + Path appPath = new Path(appsBasePath, ar.getApplicationId().toString()); + if (fs.isDirectory(appPath)) { + LOG.debug("Deleting finished application data for {}", ar.getApplicationId()); + fs.delete(appPath, true); + result.add(ar); + } + } catch (Exception ex) { + LOG.warn("Cannot delete application data for {}", ar.getApplicationId(), ex); + continue; + } + } + } + return result; + } + }
