[ 
https://issues.apache.org/jira/browse/BEAM-4519?focusedWorklogId=113334&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113334
 ]

ASF GitHub Bot logged work on BEAM-4519:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Jun/18 19:39
            Start Date: 19/Jun/18 19:39
    Worklog Time Spent: 10m 
      Work Description: jkff closed pull request #5678: [BEAM-4519] Java SDK 
passes the staging session token artifact staging service.
URL: https://github.com/apache/beam/pull/5678
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ArtifactServiceStager.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ArtifactServiceStager.java
index 375e291f42f..4ed2776b81c 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ArtifactServiceStager.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ArtifactServiceStager.java
@@ -94,18 +94,22 @@ private ArtifactServiceStager(Channel channel, int 
bufferSize) {
    *
    * @return The artifact staging token returned by the service
    */
-  public String stage(Iterable<StagedFile> files) throws IOException, 
InterruptedException {
+  public String stage(String stagingSessionToken, Iterable<StagedFile> files)
+      throws IOException, InterruptedException {
     final Map<StagedFile, CompletionStage<ArtifactMetadata>> futures = new 
HashMap<>();
     for (StagedFile file : files) {
-      futures.put(file, MoreFutures.supplyAsync(new StagingCallable(file), 
executorService));
+      futures.put(
+          file,
+          MoreFutures.supplyAsync(new StagingCallable(stagingSessionToken, 
file), executorService));
     }
     CompletionStage<StagingResult> stagingResult =
         MoreFutures.allAsList(futures.values())
             .thenApply(ignored -> new 
ExtractStagingResultsCallable(futures).call());
-    return stageManifest(stagingResult);
+    return stageManifest(stagingSessionToken, stagingResult);
   }
 
-  private String stageManifest(CompletionStage<StagingResult> stagingFuture)
+  private String stageManifest(
+      String stagingSessionToken, CompletionStage<StagingResult> stagingFuture)
       throws InterruptedException {
     try {
       StagingResult stagingResult = MoreFutures.get(stagingFuture);
@@ -114,7 +118,10 @@ private String 
stageManifest(CompletionStage<StagingResult> stagingFuture)
             
Manifest.newBuilder().addAllArtifact(stagingResult.getMetadata()).build();
         CommitManifestResponse response =
             blockingStub.commitManifest(
-                
CommitManifestRequest.newBuilder().setManifest(manifest).build());
+                CommitManifestRequest.newBuilder()
+                    .setStagingSessionToken(stagingSessionToken)
+                    .setManifest(manifest)
+                    .build());
         return response.getRetrievalToken();
       } else {
         RuntimeException failure =
@@ -133,9 +140,11 @@ private String 
stageManifest(CompletionStage<StagingResult> stagingFuture)
   }
 
   private class StagingCallable implements ThrowingSupplier<ArtifactMetadata> {
+    private final String stagingSessionToken;
     private final StagedFile file;
 
-    private StagingCallable(StagedFile file) {
+    private StagingCallable(String stagingSessionToken, StagedFile file) {
+      this.stagingSessionToken = stagingSessionToken;
       this.file = file;
     }
 
@@ -146,9 +155,8 @@ public ArtifactMetadata get() throws Exception {
       StreamObserver<PutArtifactRequest> requestObserver = 
stub.putArtifact(responseObserver);
       ArtifactMetadata metadata =
           ArtifactMetadata.newBuilder().setName(file.getStagingName()).build();
-      // TODO: Pass a valid StagingSessionToken. The token can be obtained in 
PrepareJob request.
       PutArtifactMetadata putMetadata = 
PutArtifactMetadata.newBuilder().setMetadata(metadata)
-          .setStagingSessionToken("token").build();
+          .setStagingSessionToken(stagingSessionToken).build();
       
requestObserver.onNext(PutArtifactRequest.newBuilder().setMetadata(putMetadata).build());
 
       MessageDigest md5Digest = MessageDigest.getInstance("MD5");
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ArtifactServiceStagerTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ArtifactServiceStagerTest.java
index 06d5fa939c7..bcbff948cc8 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ArtifactServiceStagerTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ArtifactServiceStagerTest.java
@@ -82,6 +82,7 @@ public void teardown() {
 
   @Test
   public void testStage() throws Exception {
+    String stagingSessionToken = "token";
     File file = temp.newFile();
     byte[] content = "foo-bar-baz".getBytes(StandardCharsets.UTF_8);
     byte[] contentMd5 = MessageDigest.getInstance("MD5").digest(content);
@@ -89,7 +90,7 @@ public void testStage() throws Exception {
       contentChannel.write(ByteBuffer.wrap(content));
     }
 
-    stager.stage(Collections.singleton(StagedFile.of(file, file.getName())));
+    stager.stage(stagingSessionToken, 
Collections.singleton(StagedFile.of(file, file.getName())));
 
     assertThat(service.getStagedArtifacts().entrySet(), hasSize(1));
     byte[] stagedContent = 
Iterables.getOnlyElement(service.getStagedArtifacts().values());
@@ -106,6 +107,8 @@ public void testStage() throws Exception {
 
   @Test
   public void testStagingMultipleFiles() throws Exception {
+    String stagingSessionToken = "token";
+
     File file = temp.newFile();
     byte[] content = "foo-bar-baz".getBytes(StandardCharsets.UTF_8);
     try (FileChannel contentChannel = new FileOutputStream(file).getChannel()) 
{
@@ -125,6 +128,7 @@ public void testStagingMultipleFiles() throws Exception {
     }
 
     stager.stage(
+        stagingSessionToken,
         ImmutableList.of(
             StagedFile.of(file, file.getName()),
             StagedFile.of(otherFile, otherFile.getName()),
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/artifact/LocalFileSystemArtifactRetrievalServiceTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/artifact/LocalFileSystemArtifactRetrievalServiceTest.java
index ef285469902..f6a991fb376 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/artifact/LocalFileSystemArtifactRetrievalServiceTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/artifact/LocalFileSystemArtifactRetrievalServiceTest.java
@@ -194,12 +194,13 @@ private void stageAndCreateRetrievalService(Map<String, 
byte[]> artifacts) throw
       Files.write(artifactFile.toPath(), artifact.getValue());
       artifactFiles.add(StagedFile.of(artifactFile, artifactFile.getName()));
     }
+    String stagingSessionToken = "token";
 
     ArtifactServiceStager stager =
         ArtifactServiceStager.overChannel(
             
InProcessChannelBuilder.forName(stagerServer.getApiServiceDescriptor().getUrl())
                 .build());
-    stager.stage(artifactFiles);
+    stager.stage(stagingSessionToken, artifactFiles);
 
     retrievalServer =
         GrpcFnServer.allocatePortAndCreateFor(
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServiceTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServiceTest.java
index 4de74f1792d..388792768b4 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServiceTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServiceTest.java
@@ -97,9 +97,11 @@ public void testPrepareJob() throws Exception {
     ArtifactServiceStager stager =
         ArtifactServiceStager.overChannel(
             InProcessChannelBuilder.forName(stagingEndpoint.getUrl()).build());
+    String stagingSessionToken = "token";
     File foo = writeTempFile("foo", "foo, bar, baz".getBytes(UTF_8));
     File bar = writeTempFile("spam", "spam, ham, eggs".getBytes(UTF_8));
     stager.stage(
+        stagingSessionToken,
         ImmutableList.of(StagedFile.of(foo, foo.getName()), StagedFile.of(bar, 
bar.getName())));
     List<byte[]> tempDirFiles = readFlattenedFiles(runnerTemp.getRoot());
     assertThat(
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java
index 6be26f4a6f5..5ae1830a010 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java
@@ -21,17 +21,19 @@
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.api.client.util.Base64;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.util.JsonFormat;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
 import io.grpc.stub.StreamObserver;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.channels.WritableByteChannel;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
+import java.util.Base64;
 import java.util.Collections;
 import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactMetadata;
 import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.CommitManifestRequest;
@@ -144,11 +146,13 @@ private static StagingSessionToken 
decodeStagingSessionToken(String stagingSessi
     try {
       return MAPPER.readValue(stagingSessionToken, StagingSessionToken.class);
     } catch (JsonProcessingException e) {
-      LOG.error(
-          "Unable to deserialize staging token {}. Expected format {}. Error 
{}",
-          stagingSessionToken, "{\"sessionId\": \"sessionId\", \"basePath\": 
\"basePath\"}",
-          e.getMessage());
-      throw e;
+      String message =
+          String.format(
+              "Unable to deserialize staging token %s. Expected format: %s. 
Error: %s",
+              stagingSessionToken,
+              "{\"sessionId\": \"sessionId\", \"basePath\": \"basePath\"}",
+              e.getMessage());
+      throw new 
StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription(message));
     }
   }
 
@@ -214,9 +218,11 @@ public void onNext(PutArtifactRequest putArtifactRequest) {
           artifactWritableByteChannel = FileSystems.create(artifactId, 
MimeTypes.BINARY);
           hasher = Hashing.md5().newHasher();
         } catch (Exception e) {
-          LOG.error("Staging failed for artifact {} for staging token {}",
-              encodedFileName(metadata.getMetadata()), 
metadata.getStagingSessionToken());
-          outboundObserver.onError(e);
+          String message =
+              String.format(
+                  "Failed to begin staging artifact %s", 
metadata.getMetadata().getName());
+          outboundObserver.onError(
+              new 
StatusRuntimeException(Status.DATA_LOSS.withDescription(message).withCause(e)));
         }
       } else {
         try {
@@ -224,9 +230,12 @@ public void onNext(PutArtifactRequest putArtifactRequest) {
           artifactWritableByteChannel.write(data.asReadOnlyByteBuffer());
           hasher.putBytes(data.toByteArray());
         } catch (IOException e) {
-          LOG.error("Staging failed for artifact {} to file {}.", 
metadata.getMetadata().getName(),
-              artifactId);
-          outboundObserver.onError(e);
+          String message =
+              String.format(
+                  "Failed to write chunk of artifact %s to %s",
+                  metadata.getMetadata().getName(), artifactId);
+          outboundObserver.onError(
+              new 
StatusRuntimeException(Status.DATA_LOSS.withDescription(message).withCause(e)));
         }
       }
     }
@@ -245,11 +254,17 @@ public void onError(Throwable throwable) {
         }
 
       } catch (IOException e) {
-        LOG.error("Unable to save artifact {}", artifactId);
-        outboundObserver.onError(e);
+        outboundObserver.onError(
+            new StatusRuntimeException(
+                Status.DATA_LOSS.withDescription(
+                    String.format("Failed to clean up artifact file %s", 
artifactId))));
         return;
       }
-      outboundObserver.onCompleted();
+      outboundObserver.onError(
+          new StatusRuntimeException(
+              Status.DATA_LOSS
+                  .withDescription(String.format("Failed to stage artifact 
%s", artifactId))
+                  .withCause(throwable)));
     }
 
     @Override
@@ -266,13 +281,14 @@ public void onCompleted() {
       }
       String expectedMd5 = metadata.getMetadata().getMd5();
       if (expectedMd5 != null && !expectedMd5.isEmpty()) {
-        String actualMd5 = Base64.encodeBase64String(hasher.hash().asBytes());
+        String actualMd5 = 
Base64.getEncoder().encodeToString(hasher.hash().asBytes());
         if (!actualMd5.equals(expectedMd5)) {
           outboundObserver.onError(
-              new IllegalArgumentException(
-                  String.format(
-                      "Artifact %s is corrupt: expected md5 %s, but has md5 
%s",
-                      metadata.getMetadata().getName(), expectedMd5, 
actualMd5)));
+              new StatusRuntimeException(
+                  Status.INVALID_ARGUMENT.withDescription(
+                      String.format(
+                          "Artifact %s is corrupt: expected md5 %s, but has 
md5 %s",
+                          metadata.getMetadata().getName(), expectedMd5, 
actualMd5))));
           return;
         }
       }
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
index e089a90eefa..383edddd758 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
@@ -58,11 +58,6 @@
 public class InMemoryJobService extends JobServiceGrpc.JobServiceImplBase 
implements FnService {
   private static final Logger LOG = 
LoggerFactory.getLogger(InMemoryJobService.class);
 
-  public static InMemoryJobService create(
-      Endpoints.ApiServiceDescriptor stagingServiceDescriptor, JobInvoker 
invoker) {
-    return new InMemoryJobService(stagingServiceDescriptor, (String session) 
-> "token", invoker);
-  }
-
   /**
    * Creates an InMemoryJobService.
    *
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobServiceTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobServiceTest.java
index 87f6ea3ccfa..fa16e549151 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobServiceTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobServiceTest.java
@@ -61,7 +61,7 @@
   public void setUp() throws Exception {
     MockitoAnnotations.initMocks(this);
     stagingServiceDescriptor = 
Endpoints.ApiServiceDescriptor.getDefaultInstance();
-    service = InMemoryJobService.create(stagingServiceDescriptor, invoker);
+    service = InMemoryJobService.create(stagingServiceDescriptor, session -> 
"token", invoker);
     when(invoker.invoke(TEST_PIPELINE, TEST_OPTIONS, 
TEST_RETRIEVAL_TOKEN)).thenReturn(invocation);
     when(invocation.getId()).thenReturn(TEST_JOB_ID);
   }
diff --git 
a/runners/reference/java/src/main/java/org/apache/beam/runners/reference/PortableRunner.java
 
b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/PortableRunner.java
index 7eaee6f50ae..3a8894f15e2 100644
--- 
a/runners/reference/java/src/main/java/org/apache/beam/runners/reference/PortableRunner.java
+++ 
b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/PortableRunner.java
@@ -159,6 +159,7 @@ public PipelineResult run(Pipeline pipeline) {
 
       ApiServiceDescriptor artifactStagingEndpoint =
           prepareJobResponse.getArtifactStagingEndpoint();
+      String stagingSessionToken = prepareJobResponse.getStagingSessionToken();
 
       String retrievalToken = null;
       try (CloseableResource<ManagedChannel> artifactChannel =
@@ -166,7 +167,7 @@ public PipelineResult run(Pipeline pipeline) {
               channelFactory.forDescriptor(artifactStagingEndpoint), 
ManagedChannel::shutdown)) {
         ArtifactServiceStager stager = 
ArtifactServiceStager.overChannel(artifactChannel.get());
         LOG.debug("Actual files staged: {}", filesToStage);
-        retrievalToken = stager.stage(filesToStage);
+        retrievalToken = stager.stage(stagingSessionToken, filesToStage);
       } catch (CloseableResource.CloseException e) {
         LOG.warn("Error closing artifact staging channel", e);
         // CloseExceptions should only be thrown while closing the channel.


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 113334)
    Time Spent: 3h 20m  (was: 3h 10m)

> Artifact Retrieval Service Protocol should be able to serve multiple 
> Manifests.
> -------------------------------------------------------------------------------
>
>                 Key: BEAM-4519
>                 URL: https://issues.apache.org/jira/browse/BEAM-4519
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-core
>            Reporter: Axel Magnuson
>            Assignee: Axel Magnuson
>            Priority: Minor
>          Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> The artifact staging service currently returns a staging_token that can be 
> used as a key to access a manifest.  However, the current protocol does not 
> have a field that accepts this token.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to