[
https://issues.apache.org/jira/browse/BEAM-4778?focusedWorklogId=130331&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-130331
]
ASF GitHub Bot logged work on BEAM-4778:
----------------------------------------
Author: ASF GitHub Bot
Created on: 02/Aug/18 16:49
Start Date: 02/Aug/18 16:49
Worklog Time Spent: 10m
Work Description: tweise closed pull request #5958: [BEAM-4778] add
option to flink job server to clean staged artifacts per-job
URL: https://github.com/apache/beam/pull/5958
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/runners/flink/job-server/build.gradle
b/runners/flink/job-server/build.gradle
index b74525254d9..aa56a475935 100644
--- a/runners/flink/job-server/build.gradle
+++ b/runners/flink/job-server/build.gradle
@@ -59,7 +59,11 @@ dependencies {
runShadow {
def jobHost = project.hasProperty("jobHost") ? project.property("jobHost") :
"localhost:8099"
def artifactsDir = project.hasProperty("artifactsDir") ?
project.property("artifactsDir") : "/tmp/flink-artifacts"
+ def cleanArtifactsPerJob = project.hasProperty("cleanArtifactsPerJob")
args = ["--job-host=${jobHost}", "--artifacts-dir=${artifactsDir}"]
+ if (cleanArtifactsPerJob)
+ args += ["--clean-artifacts-per-job"]
+
// Enable remote debugging.
jvmArgs = ["-Xdebug",
"-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005"]
}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
index 6d6685c5c37..7e9b14a2acd 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
@@ -57,6 +57,12 @@
@Option(name = "--artifacts-dir", usage = "The location to store staged
artifact files")
private String artifactStagingPath = "/tmp/beam-artifact-staging";
+ @Option(
+ name = "--clean-artifacts-per-job",
+ usage = "When true, remove each job's staged artifacts when it completes"
+ )
+ private Boolean cleanArtifactsPerJob = false;
+
@Option(name = "--flink-master-url", usage = "Flink master url to submit
job.")
private String flinkMasterUrl = "[auto]";
}
@@ -183,6 +189,11 @@ private InMemoryJobService createJobService() throws
IOException {
throw new RuntimeException(exn);
}
},
+ (String stagingSessionToken) -> {
+ if (configuration.cleanArtifactsPerJob) {
+
artifactStagingServer.getService().removeArtifacts(stagingSessionToken);
+ }
+ },
invoker);
}
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java
index 9319a70650b..613ec4d4dd7 100644
---
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java
@@ -182,6 +182,10 @@ static ProxyManifest loadManifest(String retrievalToken)
throws IOException {
LOG.info("Loading manifest for retrieval token {}", retrievalToken);
// look for manifest file at $retrieval_token
ResourceId manifestResourceId =
getManifestLocationFromToken(retrievalToken);
+ return loadManifest(manifestResourceId);
+ }
+
+ static ProxyManifest loadManifest(ResourceId manifestResourceId) throws
IOException {
ProxyManifest.Builder manifestBuilder = ProxyManifest.newBuilder();
try (InputStream stream =
Channels.newInputStream(FileSystems.open(manifestResourceId))) {
String contents = new String(ByteStreams.toByteArray(stream),
StandardCharsets.UTF_8);
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java
index f059971b6fc..5769045d0de 100644
---
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java
@@ -88,8 +88,10 @@
public void commitManifest(
CommitManifestRequest request, StreamObserver<CommitManifestResponse>
responseObserver) {
try {
- ResourceId manifestResourceId =
getManifestFileResourceId(request.getStagingSessionToken());
- ResourceId artifactDirResourceId =
getArtifactDirResourceId(request.getStagingSessionToken());
+ StagingSessionToken stagingSessionToken =
+ StagingSessionToken.decode(request.getStagingSessionToken());
+ ResourceId manifestResourceId =
getManifestFileResourceId(stagingSessionToken);
+ ResourceId artifactDirResourceId =
getArtifactDirResourceId(stagingSessionToken);
ProxyManifest.Builder proxyManifestBuilder =
ProxyManifest.newBuilder().setManifest(request.getManifest());
for (ArtifactMetadata artifactMetadata :
request.getManifest().getArtifactList()) {
@@ -138,7 +140,7 @@ public static String generateStagingSessionToken(String
sessionId, String basePa
StagingSessionToken stagingSessionToken = new StagingSessionToken();
stagingSessionToken.setSessionId(sessionId);
stagingSessionToken.setBasePath(basePath);
- return encodeStagingSessionToken(stagingSessionToken);
+ return stagingSessionToken.encode();
}
private String encodedFileName(ArtifactMetadata artifactMetadata) {
@@ -146,48 +148,49 @@ private String encodedFileName(ArtifactMetadata
artifactMetadata) {
+ Hashing.sha256().hashString(artifactMetadata.getName(),
CHARSET).toString();
}
- private static StagingSessionToken decodeStagingSessionToken(String
stagingSessionToken)
- throws Exception {
- try {
- return MAPPER.readValue(stagingSessionToken, StagingSessionToken.class);
- } catch (JsonProcessingException e) {
- String message =
- String.format(
- "Unable to deserialize staging token %s. Expected format: %s.
Error: %s",
- stagingSessionToken,
- "{\"sessionId\": \"sessionId\", \"basePath\": \"basePath\"}",
- e.getMessage());
- throw new
StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription(message));
- }
- }
+ public void removeArtifacts(String stagingSessionToken) throws Exception {
+ StagingSessionToken parsedToken =
StagingSessionToken.decode(stagingSessionToken);
+ ResourceId dir = getJobDirResourceId(parsedToken);
+ ResourceId manifestResourceId = dir.resolve(MANIFEST,
StandardResolveOptions.RESOLVE_FILE);
- private static String encodeStagingSessionToken(StagingSessionToken
stagingSessionToken)
- throws Exception {
- try {
- return MAPPER.writeValueAsString(stagingSessionToken);
- } catch (JsonProcessingException e) {
- LOG.error("Error {} occurred while serializing {}.", e.getMessage(),
stagingSessionToken);
- throw e;
+ LOG.debug("Removing dir {}", dir);
+
+ ProxyManifest proxyManifest =
+
BeamFileSystemArtifactRetrievalService.loadManifest(manifestResourceId);
+ for (Location location : proxyManifest.getLocationList()) {
+ String uri = location.getUri();
+ LOG.debug("Removing artifact: {}", uri);
+ FileSystems.delete(
+ Collections.singletonList(FileSystems.matchNewResource(uri, false /*
is directory */)));
}
+
+ ResourceId artifactsResourceId =
+ dir.resolve(ARTIFACTS, StandardResolveOptions.RESOLVE_DIRECTORY);
+ LOG.debug("Removing artifacts: {}", artifactsResourceId);
+ FileSystems.delete(Collections.singletonList(artifactsResourceId));
+ LOG.debug("Removing manifest: {}", manifestResourceId);
+ FileSystems.delete(Collections.singletonList(manifestResourceId));
+ LOG.debug("Removing empty dir: {}", dir);
+ FileSystems.delete(Collections.singletonList(dir));
+ LOG.info("Removed dir {}", dir);
}
- private ResourceId getJobDirResourceId(String stagingSessionToken) throws
Exception {
+ private ResourceId getJobDirResourceId(StagingSessionToken
stagingSessionToken) {
ResourceId baseResourceId;
- StagingSessionToken parsedToken =
decodeStagingSessionToken(stagingSessionToken);
// Get or Create the base path
baseResourceId =
- FileSystems.matchNewResource(parsedToken.getBasePath(), true /*
isDirectory */);
+ FileSystems.matchNewResource(stagingSessionToken.getBasePath(), true
/* isDirectory */);
// Using sessionId as the subDir to store artifacts and manifest.
return baseResourceId.resolve(
- parsedToken.getSessionId(), StandardResolveOptions.RESOLVE_DIRECTORY);
+ stagingSessionToken.getSessionId(),
StandardResolveOptions.RESOLVE_DIRECTORY);
}
- private ResourceId getManifestFileResourceId(String stagingSessionToken)
throws Exception {
+ private ResourceId getManifestFileResourceId(StagingSessionToken
stagingSessionToken) {
return getJobDirResourceId(stagingSessionToken)
.resolve(MANIFEST, StandardResolveOptions.RESOLVE_FILE);
}
- private ResourceId getArtifactDirResourceId(String stagingSessionToken)
throws Exception {
+ private ResourceId getArtifactDirResourceId(StagingSessionToken
stagingSessionToken) {
return getJobDirResourceId(stagingSessionToken)
.resolve(ARTIFACTS, StandardResolveOptions.RESOLVE_DIRECTORY);
}
@@ -211,10 +214,13 @@ public void onNext(PutArtifactRequest putArtifactRequest)
{
checkNotNull(putArtifactRequest);
checkNotNull(putArtifactRequest.getMetadata());
metadata = putArtifactRequest.getMetadata();
+ LOG.info("stored metadata: {}", metadata);
// Check the base path exists or create the base path
try {
ResourceId artifactsDirId =
-
getArtifactDirResourceId(putArtifactRequest.getMetadata().getStagingSessionToken());
+ getArtifactDirResourceId(
+ StagingSessionToken.decode(
+
putArtifactRequest.getMetadata().getStagingSessionToken()));
artifactId =
artifactsDirId.resolve(
encodedFileName(metadata.getMetadata()),
StandardResolveOptions.RESOLVE_FILE);
@@ -329,6 +335,30 @@ private void setBasePath(String basePath) {
this.basePath = basePath;
}
+ public String encode() {
+ try {
+ return MAPPER.writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ String message =
+ String.format("Error %s occurred while serializing %s",
e.getMessage(), this);
+ throw new
StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription(message));
+ }
+ }
+
+ public static StagingSessionToken decode(String stagingSessionToken)
throws Exception {
+ try {
+ return MAPPER.readValue(stagingSessionToken,
StagingSessionToken.class);
+ } catch (JsonProcessingException e) {
+ String message =
+ String.format(
+ "Unable to deserialize staging token %s. Expected format: %s.
Error: %s",
+ stagingSessionToken,
+ "{\"sessionId\": \"sessionId\", \"basePath\": \"basePath\"}",
+ e.getMessage());
+ throw new
StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription(message));
+ }
+ }
+
@Override
public String toString() {
return "StagingSessionToken{"
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
index 1fc2505a8f2..97758e1b46d 100644
---
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
@@ -38,6 +38,7 @@
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.runners.core.construction.graph.PipelineValidator;
import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.sdk.fn.function.ThrowingConsumer;
import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver;
import org.apache.beam.vendor.grpc.v1.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1.io.grpc.StatusException;
@@ -70,26 +71,35 @@
public static InMemoryJobService create(
Endpoints.ApiServiceDescriptor stagingServiceDescriptor,
Function<String, String> stagingServiceTokenProvider,
+ ThrowingConsumer<String> cleanupJobFn,
JobInvoker invoker) {
- return new InMemoryJobService(stagingServiceDescriptor,
stagingServiceTokenProvider, invoker);
+ return new InMemoryJobService(
+ stagingServiceDescriptor, stagingServiceTokenProvider, cleanupJobFn,
invoker);
}
private final ConcurrentMap<String, JobPreparation> preparations;
private final ConcurrentMap<String, JobInvocation> invocations;
+ private final ConcurrentMap<String, String> stagingSessionTokens;
private final Endpoints.ApiServiceDescriptor stagingServiceDescriptor;
private final Function<String, String> stagingServiceTokenProvider;
+ private final ThrowingConsumer<String> cleanupJobFn;
private final JobInvoker invoker;
private InMemoryJobService(
Endpoints.ApiServiceDescriptor stagingServiceDescriptor,
Function<String, String> stagingServiceTokenProvider,
+ ThrowingConsumer<String> cleanupJobFn,
JobInvoker invoker) {
this.stagingServiceDescriptor = stagingServiceDescriptor;
this.stagingServiceTokenProvider = stagingServiceTokenProvider;
+ this.cleanupJobFn = cleanupJobFn;
this.invoker = invoker;
this.preparations = new ConcurrentHashMap<>();
this.invocations = new ConcurrentHashMap<>();
+
+ // Map "preparation ID" to staging token
+ this.stagingSessionTokens = new ConcurrentHashMap<>();
}
@Override
@@ -121,12 +131,15 @@ public void prepare(
return;
}
+ String stagingSessionToken =
stagingServiceTokenProvider.apply(preparationId);
+ stagingSessionTokens.putIfAbsent(preparationId, stagingSessionToken);
+
// send response
PrepareJobResponse response =
PrepareJobResponse.newBuilder()
.setPreparationId(preparationId)
.setArtifactStagingEndpoint(stagingServiceDescriptor)
-
.setStagingSessionToken(stagingServiceTokenProvider.apply(preparationId))
+ .setStagingSessionToken(stagingSessionToken)
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
@@ -161,6 +174,26 @@ public void run(RunJobRequest request,
StreamObserver<RunJobResponse> responseOb
invoker.invoke(
preparation.pipeline(), preparation.options(),
request.getRetrievalToken());
String invocationId = invocation.getId();
+
+ invocation.addStateListener(
+ (state) -> {
+ if (!JobInvocation.isTerminated(state)) {
+ return;
+ }
+ String stagingSessionToken =
stagingSessionTokens.get(preparationId);
+ stagingSessionTokens.remove(preparationId);
+ if (cleanupJobFn != null) {
+ try {
+ cleanupJobFn.accept(stagingSessionToken);
+ } catch (Exception e) {
+ LOG.error(
+ "Failed to remove job staging directory for token {}: {}",
+ stagingSessionToken,
+ e);
+ }
+ }
+ });
+
invocation.start();
invocations.put(invocationId, invocation);
RunJobResponse response =
RunJobResponse.newBuilder().setJobId(invocationId).build();
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java
index f99a31a4b5c..831edc94280 100644
---
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java
@@ -42,4 +42,16 @@
/** Listen for job messages with a {@link Consumer}. */
void addMessageListener(Consumer<JobMessage> messageStreamObserver);
+
+ static Boolean isTerminated(Enum state) {
+ switch (state) {
+ case DONE:
+ case FAILED:
+ case CANCELLED:
+ case DRAINED:
+ return true;
+ default:
+ return false;
+ }
+ }
}
diff --git
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactServicesTest.java
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactServicesTest.java
index 19ffea9351c..d7a0c57c028 100644
---
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactServicesTest.java
+++
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactServicesTest.java
@@ -198,6 +198,14 @@ public void generateStagingSessionTokenTest() throws
Exception {
"{\"sessionId\":\"abc123\",\"basePath\":\"" + basePath + "\"}",
stagingToken);
}
+ void checkCleanup(String stagingSessionToken, String stagingSession) throws
Exception {
+ Assert.assertTrue(
+ Files.exists(Paths.get(stagingDir.toAbsolutePath().toString(),
stagingSession)));
+ stagingService.removeArtifacts(stagingSessionToken);
+ Assert.assertFalse(
+ Files.exists(Paths.get(stagingDir.toAbsolutePath().toString(),
stagingSession)));
+ }
+
@Test
public void putArtifactsSingleSmallFileTest() throws Exception {
String fileName = "file1";
@@ -219,6 +227,7 @@ public void putArtifactsSingleSmallFileTest() throws
Exception {
BeamFileSystemArtifactStagingService.MANIFEST),
Paths.get(stagingToken));
assertFiles(Collections.singleton(fileName), stagingToken);
+ checkCleanup(stagingSessionToken, stagingSession);
}
@Test
@@ -272,6 +281,8 @@ public void putArtifactsMultipleFilesTest() throws
Exception {
Paths.get(stagingDir.toAbsolutePath().toString(), stagingSession,
"MANIFEST").toString(),
retrievalToken);
assertFiles(files.keySet(), retrievalToken);
+
+ checkCleanup(stagingSessionToken, stagingSession);
}
@Test
@@ -332,6 +343,8 @@ public void putArtifactsMultipleFilesConcurrentlyTest()
throws Exception {
Paths.get(stagingDir.toAbsolutePath().toString(), stagingSession,
"MANIFEST").toString(),
retrievalToken);
assertFiles(files.keySet(), retrievalToken);
+
+ checkCleanup(stagingSessionToken, stagingSession);
}
@Test
@@ -418,6 +431,9 @@ public void
putArtifactsMultipleFilesConcurrentSessionsTest() throws Exception {
retrievalToken2);
assertFiles(files1.keySet(), retrievalToken1);
assertFiles(files2.keySet(), retrievalToken2);
+
+ checkCleanup(stagingSessionToken1, stagingSession1);
+ checkCleanup(stagingSessionToken2, stagingSession2);
}
private void assertFiles(Set<String> files, String retrievalToken) throws
Exception {
diff --git
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobServiceTest.java
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobServiceTest.java
index 768e6289fb3..af6934071a2 100644
---
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobServiceTest.java
+++
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobServiceTest.java
@@ -22,6 +22,7 @@
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsNull.notNullValue;
import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -58,7 +59,8 @@
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
stagingServiceDescriptor =
Endpoints.ApiServiceDescriptor.getDefaultInstance();
- service = InMemoryJobService.create(stagingServiceDescriptor, session ->
"token", invoker);
+ service =
+ InMemoryJobService.create(stagingServiceDescriptor, session ->
"token", null, invoker);
when(invoker.invoke(TEST_PIPELINE, TEST_OPTIONS,
TEST_RETRIEVAL_TOKEN)).thenReturn(invocation);
when(invocation.getId()).thenReturn(TEST_JOB_ID);
}
@@ -105,6 +107,9 @@ public void testJobSubmissionUsesJobInvokerAndIsSuccess()
throws Exception {
assertThat(runRecorder.values, hasSize(1));
JobApi.RunJobResponse runResponse = runRecorder.values.get(0);
assertThat(runResponse.getJobId(), is(TEST_JOB_ID));
+
+ verify(invocation, times(1)).addStateListener(any());
+ verify(invocation, times(1)).start();
}
private static class RecordingObserver<T> implements StreamObserver<T> {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 130331)
Time Spent: 6.5h (was: 6h 20m)
> Less wasteful ArtifactStagingService
> ------------------------------------
>
> Key: BEAM-4778
> URL: https://issues.apache.org/jira/browse/BEAM-4778
> Project: Beam
> Issue Type: Bug
> Components: runner-core
> Reporter: Eugene Kirpichov
> Assignee: Ryan Williams
> Priority: Major
> Time Spent: 6.5h
> Remaining Estimate: 0h
>
> [https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java]
> is the main implementation of ArtifactStagingService.
> It stages artifacts into a directory; and in practice the passed staging
> session token is such that the directory is different for every job. This
> leads to 2 issues:
> * It doesn't get cleaned up when the job finishes or even when the
> JobService shuts down, so we have disk space leaks if running a lot of jobs
> (e.g. a suite of ValidatesRunner tests)
> * We repeatedly re-stage the same artifacts. Instead, ideally, we should
> identify that some artifacts don't need to be staged - based on knowing their
> md5. The artifact staging protocol has rudimentary support for this but may
> need to be modified.
> CC: [~angoenka]
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)