[ 
https://issues.apache.org/jira/browse/BEAM-4778?focusedWorklogId=125840&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-125840
 ]

ASF GitHub Bot logged work on BEAM-4778:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/Jul/18 02:14
            Start Date: 23/Jul/18 02:14
    Worklog Time Spent: 10m 
      Work Description: ryan-williams commented on a change in pull request 
#5958: [BEAM-4778] add option to flink job server to clean staged artifacts 
per-job
URL: https://github.com/apache/beam/pull/5958#discussion_r204254787
 
 

 ##########
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java
 ##########
 @@ -138,56 +140,57 @@ public static String generateStagingSessionToken(String 
sessionId, String basePa
     StagingSessionToken stagingSessionToken = new StagingSessionToken();
     stagingSessionToken.setSessionId(sessionId);
     stagingSessionToken.setBasePath(basePath);
-    return encodeStagingSessionToken(stagingSessionToken);
+    return stagingSessionToken.encode();
   }
 
   private String encodedFileName(ArtifactMetadata artifactMetadata) {
     return "artifact_"
         + Hashing.sha256().hashString(artifactMetadata.getName(), 
CHARSET).toString();
   }
 
-  private static StagingSessionToken decodeStagingSessionToken(String 
stagingSessionToken)
-      throws Exception {
-    try {
-      return MAPPER.readValue(stagingSessionToken, StagingSessionToken.class);
-    } catch (JsonProcessingException e) {
-      String message =
-          String.format(
-              "Unable to deserialize staging token %s. Expected format: %s. 
Error: %s",
-              stagingSessionToken,
-              "{\"sessionId\": \"sessionId\", \"basePath\": \"basePath\"}",
-              e.getMessage());
-      throw new 
StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription(message));
-    }
-  }
+  public void removeJobArtifacts(String stagingSessionToken) throws Exception {
+    StagingSessionToken parsedToken = 
StagingSessionToken.decode(stagingSessionToken);
+    ResourceId dir = getJobDirResourceId(parsedToken);
+    ResourceId manifestResourceId = dir.resolve(MANIFEST, 
StandardResolveOptions.RESOLVE_FILE);
 
-  private static String encodeStagingSessionToken(StagingSessionToken 
stagingSessionToken)
-      throws Exception {
-    try {
-      return MAPPER.writeValueAsString(stagingSessionToken);
-    } catch (JsonProcessingException e) {
-      LOG.error("Error {} occurred while serializing {}.", e.getMessage(), 
stagingSessionToken);
-      throw e;
+    LOG.info("Removing dir {}", dir);
+
+    ProxyManifest proxyManifest =
+        
BeamFileSystemArtifactRetrievalService.loadManifest(manifestResourceId);
+    for (Location location : proxyManifest.getLocationList()) {
+      String uri = location.getUri();
+      LOG.info("Removing artifact: {}", uri);
+      FileSystems.delete(
+          Collections.singletonList(FileSystems.matchNewResource(uri, false /* 
is directory */)));
     }
+
+    ResourceId artifactsResourceId =
+        dir.resolve(ARTIFACTS, StandardResolveOptions.RESOLVE_DIRECTORY);
+    LOG.info("Removing artifacts: {}", artifactsResourceId);
+    FileSystems.delete(Collections.singletonList(artifactsResourceId));
+    LOG.info("Removing manifest: {}", manifestResourceId);
+    FileSystems.delete(Collections.singletonList(manifestResourceId));
+    LOG.info("Removing empty dir: {}", dir);
+    FileSystems.delete(Collections.singletonList(dir));
 
 Review comment:
   If it's not, currently we'll get a `DirectoryNotEmptyException`, which seems 
basically desirable?
   
   afaict `FileSystems` offers no way to recursively delete, cf. 
[BEAM-4843](https://issues.apache.org/jira/browse/BEAM-4843)
   
   for now, it seems like throwing an exception if the structure has changed 
from what is assumed here is the correct behavior, to me

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 125840)
    Time Spent: 1h 50m  (was: 1h 40m)

> Less wasteful ArtifactStagingService
> ------------------------------------
>
>                 Key: BEAM-4778
>                 URL: https://issues.apache.org/jira/browse/BEAM-4778
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-core
>            Reporter: Eugene Kirpichov
>            Assignee: Ryan Williams
>            Priority: Major
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> [https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java]
>  is the main implementation of ArtifactStagingService.
> It stages artifacts into a directory; and in practice the passed staging 
> session token is such that the directory is different for every job. This 
> leads to 2 issues:
>  * It doesn't get cleaned up when the job finishes or even when the 
> JobService shuts down, so we have disk space leaks if running a lot of jobs 
> (e.g. a suite of ValidatesRunner tests)
>  * We repeatedly re-stage the same artifacts. Instead, ideally, we should 
> identify that some artifacts don't need to be staged - based on knowing their 
> md5. The artifact staging protocol has rudimentary support for this but may 
> need to be modified.
> CC: [~angoenka]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to