[ 
https://issues.apache.org/jira/browse/BEAM-4291?focusedWorklogId=112133&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-112133
 ]

ASF GitHub Bot logged work on BEAM-4291:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Jun/18 02:03
            Start Date: 15/Jun/18 02:03
    Worklog Time Spent: 10m 
      Work Description: jkff closed pull request #5651: [BEAM-4291] Implements 
distributed artifact retrieval
URL: https://github.com/apache/beam/pull/5651
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/ArtifactSourcePool.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/ArtifactSourcePool.java
deleted file mode 100644
index 74eae977191..00000000000
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/ArtifactSourcePool.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import io.grpc.stub.StreamObserver;
-import java.io.IOException;
-import javax.annotation.concurrent.ThreadSafe;
-import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactChunk;
-import org.apache.beam.model.jobmanagement.v1.ArtifactApi.Manifest;
-import org.apache.beam.runners.fnexecution.artifact.ArtifactSource;
-
-// TODO: Pull in https://github.com/apache/beam/pull/5359 once we use Flink 
1.6.
-/**
- * A pool of {@link ArtifactSource ArtifactSources} that can be used as a 
single source. At least
- * one source must be registered before artifacts can be requested from it.
- *
- * <p>Artifact pooling is required for Flink operators that use the 
DistributedCache for artifact
- * distribution. This is because distributed caches (along with other runtime 
context) are scoped to
- * operator lifetimes but the artifact retrieval service must outlive the any 
remote environments it
- * serves. Remote environments cannot be shared between jobs and are thus 
job-scoped.
- *
- * <p>Because of the peculiarities of artifact pooling and Flink, this class 
is packaged with the
- * Flink runner rather than as a core fn-execution utility.
- */
-@ThreadSafe
-public class ArtifactSourcePool implements ArtifactSource {
-
-  /** Creates a new {@link ArtifactSourcePool}. */
-  public static ArtifactSourcePool create() {
-    return new ArtifactSourcePool();
-  }
-
-  private ArtifactSourcePool() {}
-
-  /**
-   * Adds a new cache to the pool. When the returned {@link AutoCloseable} is 
closed, the given
-   * cache will be removed from the pool.
-   */
-  public AutoCloseable addToPool(ArtifactSource artifactSource) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Manifest getManifest() throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void getArtifact(String name, StreamObserver<ArtifactChunk> 
responseObserver) {
-    throw new UnsupportedOperationException();
-  }
-}
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/BatchFlinkExecutableStageContext.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/BatchFlinkExecutableStageContext.java
index bd8b04687d8..7a040573b42 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/BatchFlinkExecutableStageContext.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/BatchFlinkExecutableStageContext.java
@@ -22,7 +22,6 @@
 import com.google.common.cache.LoadingCache;
 import com.google.common.cache.RemovalNotification;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
-import org.apache.beam.runners.flink.ArtifactSourcePool;
 import org.apache.beam.runners.fnexecution.control.DockerJobBundleFactory;
 import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
 import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
@@ -37,18 +36,14 @@
   private static final Logger LOG = 
LoggerFactory.getLogger(BatchFlinkExecutableStageContext.class);
 
   private final JobBundleFactory jobBundleFactory;
-  private final ArtifactSourcePool artifactSourcePool;
 
   private static BatchFlinkExecutableStageContext create(JobInfo jobInfo) 
throws Exception {
-    ArtifactSourcePool artifactSourcePool = ArtifactSourcePool.create();
-    JobBundleFactory jobBundleFactory = DockerJobBundleFactory.create(jobInfo, 
artifactSourcePool);
-    return new BatchFlinkExecutableStageContext(jobBundleFactory, 
artifactSourcePool);
+    JobBundleFactory jobBundleFactory = DockerJobBundleFactory.create(jobInfo);
+    return new BatchFlinkExecutableStageContext(jobBundleFactory);
   }
 
-  private BatchFlinkExecutableStageContext(
-      JobBundleFactory jobBundleFactory, ArtifactSourcePool 
artifactSourcePool) {
+  private BatchFlinkExecutableStageContext(JobBundleFactory jobBundleFactory) {
     this.jobBundleFactory = jobBundleFactory;
-    this.artifactSourcePool = artifactSourcePool;
   }
 
   @Override
@@ -62,11 +57,6 @@ public StateRequestHandler getStateRequestHandler(
     return FlinkBatchStateRequestHandler.forStage(executableStage, 
runtimeContext);
   }
 
-  @Override
-  public ArtifactSourcePool getArtifactSourcePool() {
-    return artifactSourcePool;
-  }
-
   private void cleanUp() throws Exception {
     jobBundleFactory.close();
   }
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContext.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContext.java
index d688b8ab80d..1fb5fa9361e 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContext.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContext.java
@@ -19,7 +19,6 @@
 
 import java.io.Serializable;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
-import org.apache.beam.runners.flink.ArtifactSourcePool;
 import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
 import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
@@ -44,6 +43,4 @@ static Factory batchFactory() {
 
   StateRequestHandler getStateRequestHandler(
       ExecutableStage executableStage, RuntimeContext runtimeContext);
-
-  ArtifactSourcePool getArtifactSourcePool();
 }
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
index 9030a4105ee..0612afa8996 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
@@ -24,8 +24,6 @@
 import javax.annotation.concurrent.GuardedBy;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
-import org.apache.beam.runners.flink.ArtifactSourcePool;
-import org.apache.beam.runners.fnexecution.artifact.ArtifactSource;
 import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
 import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
 import org.apache.beam.runners.fnexecution.control.RemoteBundle;
@@ -66,7 +64,6 @@
   private transient RuntimeContext runtimeContext;
   private transient StateRequestHandler stateRequestHandler;
   private transient StageBundleFactory<InputT> stageBundleFactory;
-  private transient AutoCloseable distributedCacheCloser;
   private transient BundleProgressHandler progressHandler;
 
   public FlinkExecutableStageFunction(
@@ -85,10 +82,7 @@ public void open(Configuration parameters) throws Exception {
     ExecutableStage executableStage = 
ExecutableStage.fromPayload(stagePayload);
     runtimeContext = getRuntimeContext();
     // TODO: Wire this into the distributed cache and make it pluggable.
-    ArtifactSource artifactSource = null;
     FlinkExecutableStageContext stageContext = contextFactory.get(jobInfo);
-    ArtifactSourcePool cachePool = stageContext.getArtifactSourcePool();
-    distributedCacheCloser = cachePool.addToPool(artifactSource);
     // NOTE: It's safe to reuse the state handler between partitions because 
each partition uses the
     // same backing runtime context and broadcast variables. We use checkState 
below to catch errors
     // in backward-incompatible Flink changes.
@@ -123,8 +117,7 @@ public void mapPartition(
 
   @Override
   public void close() throws Exception {
-    try (AutoCloseable cacheCloser = distributedCacheCloser;
-        AutoCloseable bundleFactoryCloser = stageBundleFactory) {}
+    try (AutoCloseable bundleFactoryCloser = stageBundleFactory) {}
   }
 
   /**
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index 607ce05c386..c716c912b4b 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -28,9 +28,7 @@
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
-import org.apache.beam.runners.flink.ArtifactSourcePool;
 import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext;
-import org.apache.beam.runners.fnexecution.artifact.ArtifactSource;
 import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
 import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
 import org.apache.beam.runners.fnexecution.control.RemoteBundle;
@@ -76,7 +74,6 @@
   private transient StateRequestHandler stateRequestHandler;
   private transient BundleProgressHandler progressHandler;
   private transient StageBundleFactory stageBundleFactory;
-  private transient AutoCloseable distributedCacheCloser;
 
   public ExecutableStageDoFnOperator(String stepName,
                                      Coder<WindowedValue<InputT>> inputCoder,
@@ -118,14 +115,11 @@ public void open() throws Exception {
 
     ExecutableStage executableStage = ExecutableStage.fromPayload(payload);
     // TODO: Wire this into the distributed cache and make it pluggable.
-    ArtifactSource artifactSource = null;
     // TODO: Do we really want this layer of indirection when accessing the 
stage bundle factory?
     // It's a little strange because this operator is responsible for the 
lifetime of the stage
     // bundle "factory" (manager?) but not the job or Flink bundle factories. 
How do we make
     // ownership of the higher level "factories" explicit? Do we care?
     FlinkExecutableStageContext stageContext = contextFactory.get(jobInfo);
-    ArtifactSourcePool cachePool = stageContext.getArtifactSourcePool();
-    distributedCacheCloser = cachePool.addToPool(artifactSource);
     // NOTE: It's safe to reuse the state handler between partitions because 
each partition uses the
     // same backing runtime context and broadcast variables. We use checkState 
below to catch errors
     // in backward-incompatible Flink changes.
@@ -152,8 +146,7 @@ private void 
processElementWithSdkHarness(WindowedValue<InputT> element) throws
 
   @Override
   public void close() throws Exception {
-    try (AutoCloseable cacheCloser = distributedCacheCloser;
-         AutoCloseable bundleFactoryCloser = stageBundleFactory) {}
+    try (AutoCloseable bundleFactoryCloser = stageBundleFactory) {}
     super.close();
   }
 
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
index 8b719d41a38..ea0ee371a13 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
@@ -36,7 +36,6 @@
 import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
 import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
-import org.apache.beam.runners.flink.ArtifactSourcePool;
 import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
@@ -80,7 +79,6 @@
   @Mock private DistributedCache distributedCache;
   @Mock private FlinkExecutableStageContext stageContext;
   @Mock private StageBundleFactory stageBundleFactory;
-  @Mock private ArtifactSourcePool artifactSourcePool;
   @Mock private StateRequestHandler stateRequestHandler;
 
   // NOTE: ExecutableStage.fromPayload expects exactly one input, so we 
provide one here. These unit
@@ -99,7 +97,6 @@
   public void setUpMocks() {
     MockitoAnnotations.initMocks(this);
     when(runtimeContext.getDistributedCache()).thenReturn(distributedCache);
-    when(stageContext.getArtifactSourcePool()).thenReturn(artifactSourcePool);
     when(stageContext.getStateRequestHandler(any(), 
any())).thenReturn(stateRequestHandler);
     
when(stageContext.getStageBundleFactory(any())).thenReturn(stageBundleFactory);
   }
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java
index 680620df459..21d6e4377ee 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java
@@ -32,7 +32,6 @@
 import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
 import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
-import org.apache.beam.runners.flink.ArtifactSourcePool;
 import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
 import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
 import org.apache.beam.runners.fnexecution.control.RemoteBundle;
@@ -67,7 +66,6 @@
   @Mock private Collector<RawUnionValue> collector;
   @Mock private FlinkExecutableStageContext stageContext;
   @Mock private StageBundleFactory<Integer> stageBundleFactory;
-  @Mock private ArtifactSourcePool artifactSourcePool;
   @Mock private StateRequestHandler stateRequestHandler;
 
   // NOTE: ExecutableStage.fromPayload expects exactly one input, so we 
provide one here. These unit
@@ -86,7 +84,6 @@
   public void setUpMocks() {
     MockitoAnnotations.initMocks(this);
     when(runtimeContext.getDistributedCache()).thenReturn(distributedCache);
-    when(stageContext.getArtifactSourcePool()).thenReturn(artifactSourcePool);
     when(stageContext.getStateRequestHandler(any(), 
any())).thenReturn(stateRequestHandler);
     
when(stageContext.<Integer>getStageBundleFactory(any())).thenReturn(stageBundleFactory);
   }
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactSource.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactSource.java
deleted file mode 100644
index aed3cbc5b2f..00000000000
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactSource.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.fnexecution.artifact;
-
-import io.grpc.stub.StreamObserver;
-import java.io.IOException;
-import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactChunk;
-import org.apache.beam.model.jobmanagement.v1.ArtifactApi.Manifest;
-
-/**
- * Makes artifacts available to an ArtifactRetrievalService by
- * encapsulating runner-specific resources.
- */
-public interface ArtifactSource {
-
-  /**
-   * Get the artifact manifest available from this source.
-   */
-  Manifest getManifest() throws IOException;
-
-  /**
-   * Get an artifact by its name.
-   */
-  void getArtifact(String name, StreamObserver<ArtifactChunk> 
responseObserver) throws IOException;
-}
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java
new file mode 100644
index 00000000000..2e0dc88aaf2
--- /dev/null
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution.artifact;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.api.client.util.Base64;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import com.google.common.io.ByteStreams;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.util.JsonFormat;
+import io.grpc.stub.StreamObserver;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.channels.Channels;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactMetadata;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ProxyManifest;
+import org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An {@link ArtifactRetrievalService} that uses {@link FileSystems} as its 
backing storage and uses
+ * the artifact layout and retrieval token format produced by {@link
+ * BeamFileSystemArtifactStagingService}.
+ */
+public class BeamFileSystemArtifactRetrievalService
+    extends ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceImplBase
+    implements ArtifactRetrievalService {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(BeamFileSystemArtifactRetrievalService.class);
+
+  private static final int ARTIFACT_CHUNK_SIZE_BYTES = 2 << 20; // 2MB
+
+  public static BeamFileSystemArtifactRetrievalService create() {
+    return new BeamFileSystemArtifactRetrievalService();
+  }
+
+  @Override
+  public void getManifest(
+      ArtifactApi.GetManifestRequest request,
+      StreamObserver<ArtifactApi.GetManifestResponse> responseObserver) {
+    String token = request.getRetrievalToken();
+    LOG.info("GetManifest for {}", token);
+    try {
+      ArtifactApi.ProxyManifest proxyManifest = MANIFEST_CACHE.get(token);
+      ArtifactApi.GetManifestResponse response =
+          ArtifactApi.GetManifestResponse.newBuilder()
+              .setManifest(proxyManifest.getManifest())
+              .build();
+      LOG.info(
+          "GetManifest for {} -> {} artifacts",
+          token,
+          proxyManifest.getManifest().getArtifactCount());
+      responseObserver.onNext(response);
+      responseObserver.onCompleted();
+    } catch (Exception e) {
+      LOG.info("GetManifest for {} failed", token, e);
+      responseObserver.onError(e);
+    }
+  }
+
+  @Override
+  public void getArtifact(
+      ArtifactApi.GetArtifactRequest request,
+      StreamObserver<ArtifactApi.ArtifactChunk> responseObserver) {
+    LOG.info("GetArtifact {}", request);
+    String name = request.getName();
+    try {
+      ArtifactApi.ProxyManifest proxyManifest = 
MANIFEST_CACHE.get(request.getRetrievalToken());
+      // look for file at URI specified by proxy manifest location
+      ArtifactApi.ProxyManifest.Location location =
+          proxyManifest
+              .getLocationList()
+              .stream()
+              .filter(loc -> loc.getName().equals(name))
+              .findFirst()
+              .orElseThrow(
+                  () ->
+                      new FileNotFoundException(
+                          String.format("Artifact location not found in 
manifest: %s", name)));
+
+      List<ArtifactMetadata> existingArtifacts = 
proxyManifest.getManifest().getArtifactList();
+      ArtifactMetadata metadata =
+          existingArtifacts
+              .stream()
+              .filter(meta -> meta.getName().equals(name))
+              .findFirst()
+              .orElseThrow(
+                  () ->
+                      new FileNotFoundException(
+                          String.format("Artifact metadata not found in 
manifest: %s", name)));
+
+      ResourceId artifactResourceId =
+          FileSystems.matchNewResource(location.getUri(), false /* is 
directory */);
+      LOG.info("Artifact {} located in {}", name, artifactResourceId);
+      Hasher hasher = Hashing.md5().newHasher();
+      byte[] data = new byte[ARTIFACT_CHUNK_SIZE_BYTES];
+      try (InputStream stream = 
Channels.newInputStream(FileSystems.open(artifactResourceId))) {
+        int len;
+        while ((len = stream.read(data)) != -1) {
+          hasher.putBytes(data, 0, len);
+          responseObserver.onNext(
+              ArtifactApi.ArtifactChunk.newBuilder()
+                  .setData(ByteString.copyFrom(data, 0, len))
+                  .build());
+        }
+      }
+      if (metadata.getMd5() != null && !metadata.getMd5().isEmpty()) {
+        ByteString expected = 
ByteString.copyFrom(Base64.decodeBase64(metadata.getMd5()));
+        ByteString actual = ByteString.copyFrom(hasher.hash().asBytes());
+        if (!actual.equals(expected)) {
+          throw new IllegalStateException(
+              String.format(
+                  "Artifact %s is corrupt: expected md5 %s, actual %s",
+                  name, expected.toString(), actual.toString()));
+        }
+      }
+      responseObserver.onCompleted();
+    } catch (IOException | ExecutionException e) {
+      LOG.info("GetArtifact {} failed", request, e);
+      responseObserver.onError(e);
+    }
+  }
+
+  @Override
+  public void close() throws Exception {}
+
+  private static final LoadingCache<String, ArtifactApi.ProxyManifest> 
MANIFEST_CACHE =
+      CacheBuilder.newBuilder()
+          .expireAfterAccess(1, TimeUnit.HOURS /* arbitrary */)
+          .maximumSize(100 /* arbitrary */)
+          .build(
+              new CacheLoader<String, ProxyManifest>() {
+                @Override
+                public ProxyManifest load(String retrievalToken) throws 
Exception {
+                  return loadManifest(retrievalToken);
+                }
+              });
+
+  @VisibleForTesting
+  static ProxyManifest loadManifest(String retrievalToken) throws IOException {
+    LOG.info("Loading manifest for retrieval token {}", retrievalToken);
+    // look for manifest file at $retrieval_token
+    ResourceId manifestResourceId = 
getManifestLocationFromToken(retrievalToken);
+    ProxyManifest.Builder manifestBuilder = ProxyManifest.newBuilder();
+    try (InputStream stream = 
Channels.newInputStream(FileSystems.open(manifestResourceId))) {
+      String contents = new String(ByteStreams.toByteArray(stream), 
StandardCharsets.UTF_8);
+      JsonFormat.parser().merge(contents, manifestBuilder);
+    }
+    ProxyManifest proxyManifest = manifestBuilder.build();
+    checkArgument(
+        proxyManifest.hasManifest(),
+        String.format("Invalid ProxyManifest at %s: doesn't have a Manifest", 
manifestResourceId));
+    checkArgument(
+        proxyManifest.getLocationCount() == 
proxyManifest.getManifest().getArtifactCount(),
+        String.format(
+            "Invalid ProxyManifestat %s: %d locations but %d artifacts",
+            manifestResourceId,
+            proxyManifest.getLocationCount(),
+            proxyManifest.getManifest().getArtifactCount()));
+    LOG.info(
+        "Manifest at {} has {} artifact locations",
+        manifestResourceId,
+        proxyManifest.getManifest().getArtifactCount());
+    return proxyManifest;
+  }
+
+  private static ResourceId getManifestLocationFromToken(String 
retrievalToken) {
+    return FileSystems.matchNewResource(retrievalToken, false /* is directory 
*/);
+  }
+}
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactSource.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactSource.java
deleted file mode 100644
index e2e359fbf89..00000000000
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactSource.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.fnexecution.artifact;
-
-import com.google.protobuf.ByteString;
-import com.google.protobuf.util.JsonFormat;
-import io.grpc.stub.StreamObserver;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.charset.StandardCharsets;
-import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
-import org.apache.beam.sdk.io.FileSystems;
-
-/**
- * An ArtifactSource suitable for retrieving artifacts uploaded via
- * {@link BeamFileSystemArtifactStagingService}.
- */
-public class BeamFileSystemArtifactSource implements ArtifactSource {
-
-  private static final int CHUNK_SIZE = 2 * 1024 * 1024;
-
-  private final String retrievalToken;
-  private ArtifactApi.ProxyManifest proxyManifest;
-
-  public BeamFileSystemArtifactSource(String retrievalToken) {
-    this.retrievalToken = retrievalToken;
-  }
-
-  public static BeamFileSystemArtifactSource create(String artifactToken) {
-    return new BeamFileSystemArtifactSource(artifactToken);
-  }
-
-  @Override
-  public ArtifactApi.Manifest getManifest() throws IOException {
-    return getProxyManifest().getManifest();
-  }
-
-  @Override
-  public void getArtifact(String name,
-      StreamObserver<ArtifactApi.ArtifactChunk> responseObserver) throws 
IOException {
-    ReadableByteChannel artifact = FileSystems
-        .open(FileSystems.matchNewResource(lookupUri(name), false));
-    ByteBuffer buffer = ByteBuffer.allocate(CHUNK_SIZE);
-    while (artifact.read(buffer) > -1) {
-      buffer.flip();
-      responseObserver.onNext(
-          
ArtifactApi.ArtifactChunk.newBuilder().setData(ByteString.copyFrom(buffer)).build());
-      buffer.clear();
-    }
-  }
-
-  private String lookupUri(String name) throws IOException {
-    for (ArtifactApi.ProxyManifest.Location location : 
getProxyManifest().getLocationList()) {
-      if (location.getName().equals(name)) {
-        return location.getUri();
-      }
-    }
-    throw new IllegalArgumentException("No such artifact: " + name);
-  }
-
-  private ArtifactApi.ProxyManifest getProxyManifest() throws IOException {
-    if (proxyManifest == null) {
-      ArtifactApi.ProxyManifest.Builder builder = 
ArtifactApi.ProxyManifest.newBuilder();
-      JsonFormat.parser().merge(Channels.newReader(
-          FileSystems.open(FileSystems.matchNewResource(retrievalToken, false 
/* is directory */)),
-          StandardCharsets.UTF_8.name()), builder);
-      proxyManifest = builder.build();
-    }
-    return proxyManifest;
-  }
-}
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java
index e94f8f2d24c..6be26f4a6f5 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java
@@ -21,7 +21,10 @@
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.api.client.util.Base64;
+import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
+import com.google.protobuf.ByteString;
 import com.google.protobuf.util.JsonFormat;
 import io.grpc.stub.StreamObserver;
 import java.io.IOException;
@@ -187,6 +190,7 @@ private ResourceId getArtifactDirResourceId(String 
stagingSessionToken) throws E
     private PutArtifactMetadata metadata;
     private ResourceId artifactId;
     private WritableByteChannel artifactWritableByteChannel;
+    private Hasher hasher;
 
     PutArtifactStreamObserver(StreamObserver<PutArtifactResponse> 
outboundObserver) {
       this.outboundObserver = outboundObserver;
@@ -208,6 +212,7 @@ public void onNext(PutArtifactRequest putArtifactRequest) {
           LOG.info("Going to stage artifact {} to {}.", 
metadata.getMetadata().getName(),
               artifactId);
           artifactWritableByteChannel = FileSystems.create(artifactId, 
MimeTypes.BINARY);
+          hasher = Hashing.md5().newHasher();
         } catch (Exception e) {
           LOG.error("Staging failed for artifact {} for staging token {}",
               encodedFileName(metadata.getMetadata()), 
metadata.getStagingSessionToken());
@@ -215,8 +220,9 @@ public void onNext(PutArtifactRequest putArtifactRequest) {
         }
       } else {
         try {
-          artifactWritableByteChannel
-              
.write(putArtifactRequest.getData().getData().asReadOnlyByteBuffer());
+          ByteString data = putArtifactRequest.getData().getData();
+          artifactWritableByteChannel.write(data.asReadOnlyByteBuffer());
+          hasher.putBytes(data.toByteArray());
         } catch (IOException e) {
           LOG.error("Staging failed for artifact {} to file {}.", 
metadata.getMetadata().getName(),
               artifactId);
@@ -258,6 +264,18 @@ public void onCompleted() {
           return;
         }
       }
+      String expectedMd5 = metadata.getMetadata().getMd5();
+      if (expectedMd5 != null && !expectedMd5.isEmpty()) {
+        String actualMd5 = Base64.encodeBase64String(hasher.hash().asBytes());
+        if (!actualMd5.equals(expectedMd5)) {
+          outboundObserver.onError(
+              new IllegalArgumentException(
+                  String.format(
+                      "Artifact %s is corrupt: expected md5 %s, but has md5 
%s",
+                      metadata.getMetadata().getName(), expectedMd5, 
actualMd5)));
+          return;
+        }
+      }
       outboundObserver.onCompleted();
     }
   }
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
index e3cb9c1bbc6..a61b4cec9db 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
@@ -38,7 +38,7 @@
 import org.apache.beam.runners.fnexecution.GrpcFnServer;
 import org.apache.beam.runners.fnexecution.ServerFactory;
 import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
-import org.apache.beam.runners.fnexecution.artifact.ArtifactSource;
+import 
org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService;
 import 
org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.ExecutableProcessBundleDescriptor;
 import 
org.apache.beam.runners.fnexecution.control.SdkHarnessClient.BundleProcessor;
 import org.apache.beam.runners.fnexecution.data.GrpcDataService;
@@ -81,7 +81,7 @@
 
   private final LoadingCache<Environment, WrappedSdkHarnessClient> 
environmentCache;
 
-  public static DockerJobBundleFactory create(JobInfo jobInfo, ArtifactSource 
artifactSource)
+  public static DockerJobBundleFactory create(JobInfo jobInfo)
       throws Exception {
     ServerFactory serverFactory = getServerFactory();
     IdGenerator stageIdGenerator = IdGenerators.incrementingLongs();
@@ -95,10 +95,9 @@ public static DockerJobBundleFactory create(JobInfo jobInfo, 
ArtifactSource arti
     GrpcFnServer<GrpcLoggingService> loggingServer =
         GrpcFnServer.allocatePortAndCreateFor(
             GrpcLoggingService.forWriter(Slf4jLogWriter.getDefault()), 
serverFactory);
-    // TODO: Wire in artifact retrieval service once implemented.
     GrpcFnServer<ArtifactRetrievalService> retrievalServer =
         GrpcFnServer.allocatePortAndCreateFor(
-            new UnimplementedArtifactRetrievalService(), serverFactory);
+            BeamFileSystemArtifactRetrievalService.create(), serverFactory);
     GrpcFnServer<StaticGrpcProvisionService> provisioningServer =
         GrpcFnServer.allocatePortAndCreateFor(
             StaticGrpcProvisionService.create(jobInfo.toProvisionInfo()), 
serverFactory);
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactServicesTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactServicesTest.java
new file mode 100644
index 00000000000..45a78e0970d
--- /dev/null
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactServicesTest.java
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.artifact;
+
+import com.google.api.client.util.Base64;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.hash.Hashing;
+import com.google.common.io.ByteStreams;
+import com.google.protobuf.ByteString;
+import io.grpc.ManagedChannel;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.stub.StreamObserver;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.channels.Channels;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactChunk;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactMetadata;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.CommitManifestRequest;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.GetArtifactRequest;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.GetManifestRequest;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.GetManifestResponse;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.Manifest;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ProxyManifest;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.ProxyManifest.Location;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactMetadata;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactRequest;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactResponse;
+import org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceBlockingStub;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceStub;
+import org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc.ArtifactStagingServiceBlockingStub;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc.ArtifactStagingServiceStub;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.InProcessServerFactory;
+import org.apache.beam.sdk.io.FileSystems;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Tests for {@link BeamFileSystemArtifactStagingService} and {@link
+ * BeamFileSystemArtifactRetrievalService}.
+ */
+public class BeamFileSystemArtifactServicesTest {
+  private static final int DATA_1KB = 1 << 10;
+  private GrpcFnServer<BeamFileSystemArtifactStagingService> stagingServer;
+  private BeamFileSystemArtifactStagingService stagingService;
+  private GrpcFnServer<BeamFileSystemArtifactRetrievalService> retrievalServer;
+  private BeamFileSystemArtifactRetrievalService retrievalService;
+  private ArtifactStagingServiceStub stagingStub;
+  private ArtifactStagingServiceBlockingStub stagingBlockingStub;
+  private ArtifactRetrievalServiceStub retrievalStub;
+  private ArtifactRetrievalServiceBlockingStub retrievalBlockingStub;
+  private Path stagingDir;
+  private Path originalDir;
+  @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+
+  @Before
+  public void setUp() throws Exception {
+    stagingService = new BeamFileSystemArtifactStagingService();
+    stagingServer =
+        GrpcFnServer.allocatePortAndCreateFor(stagingService, 
InProcessServerFactory.create());
+    ManagedChannel stagingChannel =
+        
InProcessChannelBuilder.forName(stagingServer.getApiServiceDescriptor().getUrl()).build();
+    stagingStub = ArtifactStagingServiceGrpc.newStub(stagingChannel);
+    stagingBlockingStub = 
ArtifactStagingServiceGrpc.newBlockingStub(stagingChannel);
+
+    retrievalService = new BeamFileSystemArtifactRetrievalService();
+    retrievalServer =
+        GrpcFnServer.allocatePortAndCreateFor(retrievalService, 
InProcessServerFactory.create());
+    ManagedChannel retrievalChannel =
+        
InProcessChannelBuilder.forName(retrievalServer.getApiServiceDescriptor().getUrl()).build();
+    retrievalStub = ArtifactRetrievalServiceGrpc.newStub(retrievalChannel);
+    retrievalBlockingStub = 
ArtifactRetrievalServiceGrpc.newBlockingStub(retrievalChannel);
+
+    originalDir = tempFolder.newFolder("original").toPath();
+    stagingDir = tempFolder.newFolder("staging").toPath();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (stagingServer != null) {
+      stagingServer.close();
+    }
+    if (stagingService != null) {
+      stagingService.close();
+    }
+    if (retrievalServer != null) {
+      retrievalServer.close();
+    }
+    if (retrievalService != null) {
+      retrievalService.close();
+    }
+  }
+
+  private void putArtifact(String stagingSessionToken, final String filePath, 
final String fileName)
+      throws Exception {
+    StreamObserver<PutArtifactRequest> outputStreamObserver =
+        stagingStub.putArtifact(
+            new StreamObserver<PutArtifactResponse>() {
+              @Override
+              public void onNext(PutArtifactResponse putArtifactResponse) {
+                Assert.fail("OnNext should never be called.");
+              }
+
+              @Override
+              public void onError(Throwable throwable) {
+                throwable.printStackTrace();
+                Assert.fail("OnError should never be called.");
+              }
+
+              @Override
+              public void onCompleted() {}
+            });
+    outputStreamObserver.onNext(
+        PutArtifactRequest.newBuilder()
+            .setMetadata(
+                PutArtifactMetadata.newBuilder()
+                    
.setMetadata(ArtifactMetadata.newBuilder().setName(fileName).build())
+                    .setStagingSessionToken(stagingSessionToken))
+            .build());
+
+    try (FileInputStream fileInputStream = new FileInputStream(filePath)) {
+      byte[] buffer = new byte[DATA_1KB];
+      int len;
+      while ((len = fileInputStream.read(buffer)) != -1) {
+        outputStreamObserver.onNext(
+            PutArtifactRequest.newBuilder()
+                .setData(
+                    
ArtifactChunk.newBuilder().setData(ByteString.copyFrom(buffer, 0, len)).build())
+                .build());
+      }
+      outputStreamObserver.onCompleted();
+    }
+  }
+
+  private String commitManifest(String stagingSessionToken, 
List<ArtifactMetadata> artifacts) {
+    return stagingBlockingStub
+        .commitManifest(
+            CommitManifestRequest.newBuilder()
+                .setStagingSessionToken(stagingSessionToken)
+                
.setManifest(Manifest.newBuilder().addAllArtifact(artifacts).build())
+                .build())
+        .getRetrievalToken();
+  }
+
+  @Test
+  public void generateStagingSessionTokenTest() throws Exception {
+    String basePath = stagingDir.toAbsolutePath().toString();
+    String stagingToken =
+        
BeamFileSystemArtifactStagingService.generateStagingSessionToken("abc123", 
basePath);
+    Assert.assertEquals(
+        "{\"sessionId\":\"abc123\",\"basePath\":\"" + basePath + "\"}", 
stagingToken);
+  }
+
+  @Test
+  public void putArtifactsSingleSmallFileTest() throws Exception {
+    String fileName = "file1";
+    String stagingSession = "123";
+    String stagingSessionToken =
+        BeamFileSystemArtifactStagingService.generateStagingSessionToken(
+            stagingSession, stagingDir.toUri().getPath());
+    Path srcFilePath = Paths.get(originalDir.toString(), 
fileName).toAbsolutePath();
+    Files.write(srcFilePath, "some_test".getBytes(StandardCharsets.UTF_8));
+    putArtifact(stagingSessionToken, srcFilePath.toString(), fileName);
+    String stagingToken =
+        commitManifest(
+            stagingSessionToken,
+            
Collections.singletonList(ArtifactMetadata.newBuilder().setName(fileName).build()));
+    Assert.assertEquals(
+        Paths.get(
+            stagingDir.toAbsolutePath().toString(),
+            stagingSession,
+            BeamFileSystemArtifactStagingService.MANIFEST),
+        Paths.get(stagingToken));
+    assertFiles(Collections.singleton(fileName), stagingToken);
+  }
+
+  @Test
+  public void putArtifactsMultipleFilesTest() throws Exception {
+    String stagingSession = "123";
+    Map<String, Integer> files =
+        ImmutableMap.<String, Integer>builder()
+            .put("file5cb", (DATA_1KB / 2) /*500b*/)
+            .put("file1kb", DATA_1KB /*1 kb*/)
+            .put("file15cb", (DATA_1KB * 3) / 2 /*1.5 kb*/)
+            .put("nested/file1kb", DATA_1KB /*1 kb*/)
+            .put("file10kb", 10 * DATA_1KB /*10 kb*/)
+            .put("file100kb", 100 * DATA_1KB /*100 kb*/)
+            .build();
+    Map<String, byte[]> md5 = Maps.newHashMap();
+
+    final String text = "abcdefghinklmop\n";
+    files.forEach(
+        (fileName, size) -> {
+          Path filePath = Paths.get(originalDir.toString(), 
fileName).toAbsolutePath();
+          try {
+            Files.createDirectories(filePath.getParent());
+            byte[] contents =
+                Strings.repeat(
+                        text, Double.valueOf(Math.ceil(size * 1.0 / 
text.length())).intValue())
+                    .getBytes(StandardCharsets.UTF_8);
+            Files.write(filePath, contents);
+            md5.put(fileName, Hashing.md5().hashBytes(contents).asBytes());
+          } catch (IOException ignored) {
+          }
+        });
+    String stagingSessionToken =
+        BeamFileSystemArtifactStagingService.generateStagingSessionToken(
+            stagingSession, stagingDir.toUri().getPath());
+
+    List<ArtifactMetadata> metadata = new ArrayList<>();
+    for (String fileName : files.keySet()) {
+      putArtifact(
+          stagingSessionToken,
+          Paths.get(originalDir.toString(), 
fileName).toAbsolutePath().toString(),
+          fileName);
+      metadata.add(
+          ArtifactMetadata.newBuilder()
+              .setName(fileName)
+              .setMd5(Base64.encodeBase64String(md5.get(fileName)))
+              .build());
+    }
+
+    String retrievalToken = commitManifest(stagingSessionToken, metadata);
+    Assert.assertEquals(
+        Paths.get(stagingDir.toAbsolutePath().toString(), stagingSession, 
"MANIFEST").toString(),
+        retrievalToken);
+    assertFiles(files.keySet(), retrievalToken);
+  }
+
+  @Test
+  public void putArtifactsMultipleFilesConcurrentlyTest() throws Exception {
+    String stagingSession = "123";
+    Map<String, Integer> files =
+        ImmutableMap.<String, Integer>builder()
+            .put("file5cb", (DATA_1KB / 2) /*500b*/)
+            .put("file1kb", DATA_1KB /*1 kb*/)
+            .put("file15cb", (DATA_1KB * 3) / 2 /*1.5 kb*/)
+            .put("nested/file1kb", DATA_1KB /*1 kb*/)
+            .put("file10kb", 10 * DATA_1KB /*10 kb*/)
+            .put("file100kb", 100 * DATA_1KB /*100 kb*/)
+            .build();
+
+    final String text = "abcdefghinklmop\n";
+    files.forEach(
+        (fileName, size) -> {
+          Path filePath = Paths.get(originalDir.toString(), 
fileName).toAbsolutePath();
+          try {
+            Files.createDirectories(filePath.getParent());
+            Files.write(
+                filePath,
+                Strings.repeat(
+                        text, Double.valueOf(Math.ceil(size * 1.0 / 
text.length())).intValue())
+                    .getBytes(StandardCharsets.UTF_8));
+          } catch (IOException ignored) {
+          }
+        });
+    String stagingSessionToken =
+        BeamFileSystemArtifactStagingService.generateStagingSessionToken(
+            stagingSession, stagingDir.toUri().getPath());
+
+    List<ArtifactMetadata> metadata = new ArrayList<>();
+    ExecutorService executorService = Executors.newFixedThreadPool(8);
+    try {
+      for (String fileName : files.keySet()) {
+        executorService.execute(
+            () -> {
+              try {
+                putArtifact(
+                    stagingSessionToken,
+                    Paths.get(originalDir.toString(), 
fileName).toAbsolutePath().toString(),
+                    fileName);
+              } catch (Exception e) {
+                Assert.fail(e.getMessage());
+              }
+              
metadata.add(ArtifactMetadata.newBuilder().setName(fileName).build());
+            });
+      }
+    } finally {
+      executorService.shutdown();
+      executorService.awaitTermination(2, TimeUnit.SECONDS);
+    }
+
+    String retrievalToken = commitManifest(stagingSessionToken, metadata);
+    Assert.assertEquals(
+        Paths.get(stagingDir.toAbsolutePath().toString(), stagingSession, 
"MANIFEST").toString(),
+        retrievalToken);
+    assertFiles(files.keySet(), retrievalToken);
+  }
+
+  @Test
+  public void putArtifactsMultipleFilesConcurrentSessionsTest() throws 
Exception {
+    String stagingSession1 = "123";
+    String stagingSession2 = "abc";
+    Map<String, Integer> files1 =
+        ImmutableMap.<String, Integer>builder()
+            .put("file5cb", (DATA_1KB / 2) /*500b*/)
+            .put("file1kb", DATA_1KB /*1 kb*/)
+            .put("file15cb", (DATA_1KB * 3) / 2 /*1.5 kb*/)
+            .build();
+    Map<String, Integer> files2 =
+        ImmutableMap.<String, Integer>builder()
+            .put("nested/file1kb", DATA_1KB /*1 kb*/)
+            .put("file10kb", 10 * DATA_1KB /*10 kb*/)
+            .put("file100kb", 100 * DATA_1KB /*100 kb*/)
+            .build();
+
+    final String text = "abcdefghinklmop\n";
+    ImmutableMap.<String, Integer>builder()
+        .putAll(files1)
+        .putAll(files2)
+        .build()
+        .forEach(
+            (fileName, size) -> {
+              Path filePath = Paths.get(originalDir.toString(), 
fileName).toAbsolutePath();
+              try {
+                Files.createDirectories(filePath.getParent());
+                Files.write(
+                    filePath,
+                    Strings.repeat(
+                            text, Double.valueOf(Math.ceil(size * 1.0 / 
text.length())).intValue())
+                        .getBytes(StandardCharsets.UTF_8));
+              } catch (IOException ignored) {
+              }
+            });
+    String stagingSessionToken1 =
+        BeamFileSystemArtifactStagingService.generateStagingSessionToken(
+            stagingSession1, stagingDir.toUri().getPath());
+    String stagingSessionToken2 =
+        BeamFileSystemArtifactStagingService.generateStagingSessionToken(
+            stagingSession2, stagingDir.toUri().getPath());
+
+    List<ArtifactMetadata> metadata1 = new ArrayList<>();
+    List<ArtifactMetadata> metadata2 = new ArrayList<>();
+    ExecutorService executorService = Executors.newFixedThreadPool(8);
+    try {
+      Iterator<String> iterator1 = files1.keySet().iterator();
+      Iterator<String> iterator2 = files2.keySet().iterator();
+      while (iterator1.hasNext() && iterator2.hasNext()) {
+        String fileName1 = iterator1.next();
+        String fileName2 = iterator2.next();
+        executorService.execute(
+            () -> {
+              try {
+                putArtifact(
+                    stagingSessionToken1,
+                    Paths.get(originalDir.toString(), 
fileName1).toAbsolutePath().toString(),
+                    fileName1);
+                putArtifact(
+                    stagingSessionToken2,
+                    Paths.get(originalDir.toString(), 
fileName2).toAbsolutePath().toString(),
+                    fileName2);
+              } catch (Exception e) {
+                Assert.fail(e.getMessage());
+              }
+              
metadata1.add(ArtifactMetadata.newBuilder().setName(fileName1).build());
+              
metadata2.add(ArtifactMetadata.newBuilder().setName(fileName2).build());
+            });
+      }
+    } finally {
+      executorService.shutdown();
+      executorService.awaitTermination(2, TimeUnit.SECONDS);
+    }
+
+    String retrievalToken1 = commitManifest(stagingSessionToken1, metadata1);
+    String retrievalToken2 = commitManifest(stagingSessionToken2, metadata2);
+    Assert.assertEquals(
+        Paths.get(stagingDir.toAbsolutePath().toString(), stagingSession1, 
"MANIFEST").toString(),
+        retrievalToken1);
+    Assert.assertEquals(
+        Paths.get(stagingDir.toAbsolutePath().toString(), stagingSession2, 
"MANIFEST").toString(),
+        retrievalToken2);
+    assertFiles(files1.keySet(), retrievalToken1);
+    assertFiles(files2.keySet(), retrievalToken2);
+  }
+
+  private void assertFiles(Set<String> files, String retrievalToken) throws 
Exception {
+    ProxyManifest proxyManifest =
+        BeamFileSystemArtifactRetrievalService.loadManifest(retrievalToken);
+    GetManifestResponse retrievedManifest =
+        retrievalBlockingStub.getManifest(
+            
GetManifestRequest.newBuilder().setRetrievalToken(retrievalToken).build());
+    Assert.assertEquals(
+        "Manifest in proxy manifest file doesn't match the retrieved manifest",
+        proxyManifest.getManifest(),
+        retrievedManifest.getManifest());
+    Assert.assertEquals(
+        "Files in locations does not match actual file list.",
+        files,
+        proxyManifest
+            .getLocationList()
+            .stream()
+            .map(Location::getName)
+            .collect(Collectors.toSet()));
+    Assert.assertEquals(
+        "Duplicate file entries in locations.", files.size(), 
proxyManifest.getLocationCount());
+    for (Location location : proxyManifest.getLocationList()) {
+      String expectedContent =
+          readFile(
+              Paths.get(originalDir.toString(), 
location.getName()).toAbsolutePath().toString());
+      String actualStagedContent = readFile(location.getUri());
+      Assert.assertEquals(
+          "Staged content doesn't match expected content for " + 
location.getName(),
+          expectedContent,
+          actualStagedContent);
+      String retrievedContent = retrieveArtifact(location.getName(), 
retrievalToken);
+      Assert.assertEquals(
+          "Retrieved content doesn't match expected content for " + 
location.getName(),
+          expectedContent,
+          retrievedContent);
+    }
+  }
+
+  private static String readFile(String path) throws IOException {
+    try (InputStream stream =
+        Channels.newInputStream(
+            FileSystems.open(FileSystems.matchNewResource(path, false /* 
isDirectory */)))) {
+      return new String(ByteStreams.toByteArray(stream), 
StandardCharsets.UTF_8);
+    }
+  }
+
+  private String retrieveArtifact(String name, String retrievalToken)
+      throws ExecutionException, InterruptedException {
+    CompletableFuture<ByteString> result = new CompletableFuture<>();
+    retrievalStub.getArtifact(
+        
GetArtifactRequest.newBuilder().setRetrievalToken(retrievalToken).setName(name).build(),
+        new StreamObserver<ArtifactChunk>() {
+          private ByteString data = ByteString.EMPTY;
+
+          @Override
+          public void onNext(ArtifactChunk artifactChunk) {
+            data = data.concat(artifactChunk.getData());
+          }
+
+          @Override
+          public void onError(Throwable throwable) {
+            result.completeExceptionally(throwable);
+          }
+
+          @Override
+          public void onCompleted() {
+            result.complete(data);
+          }
+        });
+    return result.get().toStringUtf8();
+  }
+}
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactSourceTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactSourceTest.java
deleted file mode 100644
index 01f1723d99b..00000000000
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactSourceTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.fnexecution.artifact;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-
-import com.google.protobuf.ByteString;
-import io.grpc.stub.StreamObserver;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for BeamFileSystemArtifactSource.
- */
-@RunWith(JUnit4.class) public class BeamFileSystemArtifactSourceTest {
-
-  BeamFileSystemArtifactStagingService stagingService = new 
BeamFileSystemArtifactStagingService();
-
-  @Rule public TemporaryFolder stagingDir = new TemporaryFolder();
-
-  @Test public void testStagingService() throws Exception {
-    String stagingSession = "stagingSession";
-    String stagingSessionToken = BeamFileSystemArtifactStagingService
-        .generateStagingSessionToken(stagingSession, 
stagingDir.newFolder().getPath());
-    List<ArtifactApi.ArtifactMetadata> metadata = new ArrayList<>();
-
-    
metadata.add(ArtifactApi.ArtifactMetadata.newBuilder().setName("file1").build());
-    putArtifactContents(stagingSessionToken, "first", "file1");
-
-    
metadata.add(ArtifactApi.ArtifactMetadata.newBuilder().setName("file2").build());
-    putArtifactContents(stagingSessionToken, "second", "file2");
-
-    String stagingToken = commitManifest(stagingSessionToken, metadata);
-
-    BeamFileSystemArtifactSource artifactSource = new 
BeamFileSystemArtifactSource(stagingToken);
-    Assert.assertEquals("first", getArtifactContents(artifactSource, "file1"));
-    Assert.assertEquals("second", getArtifactContents(artifactSource, 
"file2"));
-    Assert.assertThat(artifactSource.getManifest().getArtifactList(),
-        containsInAnyOrder(metadata.toArray(new 
ArtifactApi.ArtifactMetadata[0])));
-  }
-
-  private String commitManifest(String stagingSessionToken,
-      List<ArtifactApi.ArtifactMetadata> artifacts) {
-    String[] stagingTokenHolder = new String[1];
-    stagingService.commitManifest(
-        
ArtifactApi.CommitManifestRequest.newBuilder().setStagingSessionToken(stagingSessionToken)
-            
.setManifest(ArtifactApi.Manifest.newBuilder().addAllArtifact(artifacts)).build(),
-        new StreamObserver<ArtifactApi.CommitManifestResponse>() {
-
-          @Override public void onNext(ArtifactApi.CommitManifestResponse 
commitManifestResponse) {
-            stagingTokenHolder[0] = commitManifestResponse.getRetrievalToken();
-          }
-
-          @Override public void onError(Throwable throwable) {
-            throw new RuntimeException(throwable);
-          }
-
-          @Override public void onCompleted() {
-          }
-        });
-
-    return stagingTokenHolder[0];
-  }
-
-  private void putArtifactContents(String stagingSessionToken, String 
contents, String name) {
-    StreamObserver<ArtifactApi.PutArtifactRequest> outputStreamObserver = 
stagingService
-        .putArtifact(new StreamObserver<ArtifactApi.PutArtifactResponse>() {
-
-          @Override public void onNext(ArtifactApi.PutArtifactResponse 
putArtifactResponse) {
-          }
-
-          @Override public void onError(Throwable throwable) {
-            throw new RuntimeException(throwable);
-          }
-
-          @Override public void onCompleted() {
-          }
-        });
-
-    
outputStreamObserver.onNext(ArtifactApi.PutArtifactRequest.newBuilder().setMetadata(
-        ArtifactApi.PutArtifactMetadata.newBuilder()
-            
.setMetadata(ArtifactApi.ArtifactMetadata.newBuilder().setName(name).build())
-            .setStagingSessionToken(stagingSessionToken)).build());
-    
outputStreamObserver.onNext(ArtifactApi.PutArtifactRequest.newBuilder().setData(
-        ArtifactApi.ArtifactChunk.newBuilder()
-            .setData(ByteString.copyFrom(contents, 
StandardCharsets.UTF_8))).build());
-    outputStreamObserver.onCompleted();
-  }
-
-  private String getArtifactContents(ArtifactSource artifactSource, String 
name)
-      throws IOException {
-    StringBuilder contents = new StringBuilder();
-    artifactSource.getArtifact(name, new 
StreamObserver<ArtifactApi.ArtifactChunk>() {
-
-      @Override public void onNext(ArtifactApi.ArtifactChunk artifactChunk) {
-        
contents.append(artifactChunk.getData().toString(StandardCharsets.UTF_8));
-      }
-
-      @Override public void onError(Throwable throwable) {
-        throw new RuntimeException(throwable);
-      }
-
-      @Override public void onCompleted() {
-      }
-    });
-    return contents.toString();
-  }
-}
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingServiceTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingServiceTest.java
deleted file mode 100644
index 73ca431baed..00000000000
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingServiceTest.java
+++ /dev/null
@@ -1,382 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.fnexecution.artifact;
-
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableMap;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.util.JsonFormat;
-import io.grpc.inprocess.InProcessChannelBuilder;
-import io.grpc.stub.StreamObserver;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactChunk;
-import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactMetadata;
-import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.CommitManifestRequest;
-import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.CommitManifestResponse;
-import org.apache.beam.model.jobmanagement.v1.ArtifactApi.Manifest;
-import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ProxyManifest;
-import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.ProxyManifest.Builder;
-import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.ProxyManifest.Location;
-import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactMetadata;
-import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactRequest;
-import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactResponse;
-import org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc;
-import 
org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc.ArtifactStagingServiceStub;
-import org.apache.beam.runners.fnexecution.GrpcFnServer;
-import org.apache.beam.runners.fnexecution.InProcessServerFactory;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link BeamFileSystemArtifactStagingService}.
- */
-@RunWith(JUnit4.class)
-public class BeamFileSystemArtifactStagingServiceTest {
-
-  private static final Joiner JOINER = Joiner.on("");
-  private static final Charset CHARSET = StandardCharsets.UTF_8;
-  private static final int DATA_1KB = 1 << 10;
-  private GrpcFnServer<BeamFileSystemArtifactStagingService> server;
-  private BeamFileSystemArtifactStagingService artifactStagingService;
-  private ArtifactStagingServiceStub stub;
-  private Path destDir;
-  private Path srcDir;
-  @Rule
-  public TemporaryFolder tempFolder = new TemporaryFolder();
-
-  @Before
-  public void setUp() throws Exception {
-    artifactStagingService = new BeamFileSystemArtifactStagingService();
-    server = GrpcFnServer
-        .allocatePortAndCreateFor(artifactStagingService, 
InProcessServerFactory.create());
-    stub =
-        ArtifactStagingServiceGrpc.newStub(
-            
InProcessChannelBuilder.forName(server.getApiServiceDescriptor().getUrl()).build());
-    srcDir = tempFolder.newFolder("BFSTemp").toPath();
-    destDir = tempFolder.newFolder("BFDTemp").toPath();
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    if (server != null) {
-      server.close();
-    }
-    if (artifactStagingService != null) {
-      artifactStagingService.close();
-    }
-  }
-
-  private void putArtifact(String stagingSessionToken, final String filePath, 
final String fileName)
-      throws Exception {
-    StreamObserver<PutArtifactRequest> outputStreamObserver = stub
-        .putArtifact(new StreamObserver<PutArtifactResponse>() {
-          @Override
-          public void onNext(PutArtifactResponse putArtifactResponse) {
-            Assert.fail("OnNext should never be called.");
-          }
-
-          @Override
-          public void onError(Throwable throwable) {
-            throwable.printStackTrace();
-            Assert.fail("OnError should never be called.");
-          }
-
-          @Override
-          public void onCompleted() {
-          }
-        });
-    outputStreamObserver.onNext(PutArtifactRequest.newBuilder().setMetadata(
-        PutArtifactMetadata.newBuilder()
-            
.setMetadata(ArtifactMetadata.newBuilder().setName(fileName).build())
-            .setStagingSessionToken(stagingSessionToken)).build());
-
-    try (FileInputStream fileInputStream = new FileInputStream(filePath)) {
-      byte[] buffer = new byte[DATA_1KB]; // 1kb chunk
-      int read = fileInputStream.read(buffer);
-      while (read != -1) {
-        outputStreamObserver.onNext(PutArtifactRequest.newBuilder().setData(
-            ArtifactChunk.newBuilder().setData(ByteString.copyFrom(buffer, 0, 
read)).build())
-            .build());
-        read = fileInputStream.read(buffer);
-      }
-      outputStreamObserver.onCompleted();
-    }
-  }
-
-  private String commitManifest(String stagingSessionToken, 
List<ArtifactMetadata> artifacts)
-      throws Exception {
-    CompletableFuture<String> stagingTokenFuture = new CompletableFuture<>();
-    stub.commitManifest(
-        
CommitManifestRequest.newBuilder().setStagingSessionToken(stagingSessionToken)
-            
.setManifest(Manifest.newBuilder().addAllArtifact(artifacts).build()).build(),
-        new StreamObserver<CommitManifestResponse>() {
-          @Override
-          public void onNext(CommitManifestResponse commitManifestResponse) {
-            
stagingTokenFuture.complete(commitManifestResponse.getRetrievalToken());
-          }
-
-          @Override
-          public void onError(Throwable throwable) {
-            Assert.fail("OnError should never be called.");
-          }
-
-          @Override
-          public void onCompleted() {
-
-          }
-        });
-    return stagingTokenFuture.get(1, TimeUnit.SECONDS);
-
-  }
-
-  @Test
-  public void generateStagingSessionTokenTest() throws Exception {
-    String basePath = destDir.toAbsolutePath().toString();
-    String stagingToken = BeamFileSystemArtifactStagingService
-        .generateStagingSessionToken("abc123", basePath);
-    Assert.assertEquals(
-        "{\"sessionId\":\"abc123\",\"basePath\":\"" + basePath + "\"}", 
stagingToken);
-  }
-
-  @Test
-  public void putArtifactsSingleSmallFileTest() throws Exception {
-    String fileName = "file1";
-    String stagingSession = "123";
-    String stagingSessionToken = BeamFileSystemArtifactStagingService
-        .generateStagingSessionToken(stagingSession, 
destDir.toUri().getPath());
-    Path srcFilePath = Paths.get(srcDir.toString(), fileName).toAbsolutePath();
-    Files.write(srcFilePath, "some_test".getBytes(CHARSET));
-    putArtifact(stagingSessionToken, srcFilePath.toString(), fileName);
-    String stagingToken = commitManifest(stagingSessionToken,
-        
Collections.singletonList(ArtifactMetadata.newBuilder().setName(fileName).build()));
-    Assert.assertEquals(
-        Paths.get(destDir.toAbsolutePath().toString(), stagingSession,
-            BeamFileSystemArtifactStagingService.MANIFEST),
-        Paths.get(stagingToken));
-    assertFiles(Collections.singleton(fileName), stagingToken);
-  }
-
-  @Test
-  public void putArtifactsMultipleFilesTest() throws Exception {
-    String stagingSession = "123";
-    Map<String, Integer> files = ImmutableMap.<String, Integer>builder()
-        .put("file5cb", (DATA_1KB / 2) /*500b*/)
-        .put("file1kb", DATA_1KB /*1 kb*/)
-        .put("file15cb", (DATA_1KB * 3) / 2  /*1.5 kb*/)
-        .put("nested/file1kb", DATA_1KB /*1 kb*/)
-        .put("file10kb", 10 * DATA_1KB /*10 kb*/)
-        .put("file100kb", 100 * DATA_1KB /*100 kb*/)
-        .build();
-
-    final String text = "abcdefghinklmop\n";
-    files.forEach((fileName, size) -> {
-      Path filePath = Paths.get(srcDir.toString(), fileName).toAbsolutePath();
-      try {
-        Files.createDirectories(filePath.getParent());
-        Files.write(filePath,
-            Strings.repeat(text, Double.valueOf(Math.ceil(size * 1.0 / 
text.length())).intValue())
-                .getBytes(CHARSET));
-      } catch (IOException ignored) {
-      }
-    });
-    String stagingSessionToken = BeamFileSystemArtifactStagingService
-        .generateStagingSessionToken(stagingSession, 
destDir.toUri().getPath());
-
-    List<ArtifactMetadata> metadata = new ArrayList<>();
-    for (String fileName : files.keySet()) {
-      putArtifact(stagingSessionToken,
-          Paths.get(srcDir.toString(), fileName).toAbsolutePath().toString(), 
fileName);
-      metadata.add(ArtifactMetadata.newBuilder().setName(fileName).build());
-    }
-
-    String stagingToken = commitManifest(stagingSessionToken, metadata);
-    Assert.assertEquals(
-        Paths.get(destDir.toAbsolutePath().toString(), stagingSession, 
"MANIFEST").toString(),
-        stagingToken);
-    assertFiles(files.keySet(), stagingToken);
-  }
-
-  @Test
-  public void putArtifactsMultipleFilesConcurrentlyTest() throws Exception {
-    String stagingSession = "123";
-    Map<String, Integer> files = ImmutableMap.<String, Integer>builder()
-        .put("file5cb", (DATA_1KB / 2) /*500b*/)
-        .put("file1kb", DATA_1KB /*1 kb*/)
-        .put("file15cb", (DATA_1KB * 3) / 2  /*1.5 kb*/)
-        .put("nested/file1kb", DATA_1KB /*1 kb*/)
-        .put("file10kb", 10 * DATA_1KB /*10 kb*/)
-        .put("file100kb", 100 * DATA_1KB /*100 kb*/)
-        .build();
-
-    final String text = "abcdefghinklmop\n";
-    files.forEach((fileName, size) -> {
-      Path filePath = Paths.get(srcDir.toString(), fileName).toAbsolutePath();
-      try {
-        Files.createDirectories(filePath.getParent());
-        Files.write(filePath,
-            Strings.repeat(text, Double.valueOf(Math.ceil(size * 1.0 / 
text.length())).intValue())
-                .getBytes(CHARSET));
-      } catch (IOException ignored) {
-      }
-    });
-    String stagingSessionToken = BeamFileSystemArtifactStagingService
-        .generateStagingSessionToken(stagingSession, 
destDir.toUri().getPath());
-
-    List<ArtifactMetadata> metadata = new ArrayList<>();
-    ExecutorService executorService = Executors.newFixedThreadPool(8);
-    try {
-      for (String fileName : files.keySet()) {
-        executorService.execute(() -> {
-          try {
-            putArtifact(stagingSessionToken,
-                Paths.get(srcDir.toString(), 
fileName).toAbsolutePath().toString(), fileName);
-          } catch (Exception e) {
-            Assert.fail(e.getMessage());
-          }
-          
metadata.add(ArtifactMetadata.newBuilder().setName(fileName).build());
-        });
-      }
-    } finally {
-      executorService.shutdown();
-      executorService.awaitTermination(2, TimeUnit.SECONDS);
-    }
-
-    String stagingToken = commitManifest(stagingSessionToken, metadata);
-    Assert.assertEquals(
-        Paths.get(destDir.toAbsolutePath().toString(), stagingSession, 
"MANIFEST").toString(),
-        stagingToken);
-    assertFiles(files.keySet(), stagingToken);
-  }
-
-  @Test
-  public void putArtifactsMultipleFilesConcurrentSessionsTest() throws 
Exception {
-    String stagingSession1 = "123";
-    String stagingSession2 = "abc";
-    Map<String, Integer> files1 = ImmutableMap.<String, Integer>builder()
-        .put("file5cb", (DATA_1KB / 2) /*500b*/)
-        .put("file1kb", DATA_1KB /*1 kb*/)
-        .put("file15cb", (DATA_1KB * 3) / 2  /*1.5 kb*/)
-        .build();
-    Map<String, Integer> files2 = ImmutableMap.<String, Integer>builder()
-        .put("nested/file1kb", DATA_1KB /*1 kb*/)
-        .put("file10kb", 10 * DATA_1KB /*10 kb*/)
-        .put("file100kb", 100 * DATA_1KB /*100 kb*/)
-        .build();
-
-    final String text = "abcdefghinklmop\n";
-    ImmutableMap.<String, 
Integer>builder().putAll(files1).putAll(files2).build()
-        .forEach((fileName, size) -> {
-          Path filePath = Paths.get(srcDir.toString(), 
fileName).toAbsolutePath();
-          try {
-            Files.createDirectories(filePath.getParent());
-            Files.write(filePath,
-                Strings
-                    .repeat(text, Double.valueOf(Math.ceil(size * 1.0 / 
text.length())).intValue())
-                    .getBytes(CHARSET));
-          } catch (IOException ignored) {
-          }
-        });
-    String stagingSessionToken1 = BeamFileSystemArtifactStagingService
-        .generateStagingSessionToken(stagingSession1, 
destDir.toUri().getPath());
-    String stagingSessionToken2 = BeamFileSystemArtifactStagingService
-        .generateStagingSessionToken(stagingSession2, 
destDir.toUri().getPath());
-
-    List<ArtifactMetadata> metadata1 = new ArrayList<>();
-    List<ArtifactMetadata> metadata2 = new ArrayList<>();
-    ExecutorService executorService = Executors.newFixedThreadPool(8);
-    try {
-      Iterator<String> iterator1 = files1.keySet().iterator();
-      Iterator<String> iterator2 = files2.keySet().iterator();
-      while (iterator1.hasNext() && iterator2.hasNext()) {
-        String fileName1 = iterator1.next();
-        String fileName2 = iterator2.next();
-        executorService.execute(() -> {
-          try {
-            putArtifact(stagingSessionToken1,
-                Paths.get(srcDir.toString(), 
fileName1).toAbsolutePath().toString(), fileName1);
-            putArtifact(stagingSessionToken2,
-                Paths.get(srcDir.toString(), 
fileName2).toAbsolutePath().toString(), fileName2);
-          } catch (Exception e) {
-            Assert.fail(e.getMessage());
-          }
-          
metadata1.add(ArtifactMetadata.newBuilder().setName(fileName1).build());
-          
metadata2.add(ArtifactMetadata.newBuilder().setName(fileName2).build());
-        });
-      }
-    } finally {
-      executorService.shutdown();
-      executorService.awaitTermination(2, TimeUnit.SECONDS);
-    }
-
-    String stagingToken1 = commitManifest(stagingSessionToken1, metadata1);
-    String stagingToken2 = commitManifest(stagingSessionToken2, metadata2);
-    Assert.assertEquals(
-        Paths.get(destDir.toAbsolutePath().toString(), stagingSession1, 
"MANIFEST").toString(),
-        stagingToken1);
-    Assert.assertEquals(
-        Paths.get(destDir.toAbsolutePath().toString(), stagingSession2, 
"MANIFEST").toString(),
-        stagingToken2);
-    assertFiles(files1.keySet(), stagingToken1);
-    assertFiles(files2.keySet(), stagingToken2);
-  }
-
-  private void assertFiles(Set<String> files, String stagingToken) throws 
IOException {
-    Builder proxyManifestBuilder = ProxyManifest.newBuilder();
-    JsonFormat.parser().merge(
-        JOINER.join(Files.readAllLines(Paths.get(stagingToken), CHARSET)),
-        proxyManifestBuilder);
-    ProxyManifest proxyManifest = proxyManifestBuilder.build();
-    Assert.assertEquals("Files in locations does not match actual file list.", 
files,
-        proxyManifest.getLocationList().stream().map(Location::getName)
-            .collect(Collectors.toSet()));
-    Assert.assertEquals("Duplicate file entries in locations.", files.size(),
-        proxyManifest.getLocationCount());
-    for (Location location : proxyManifest.getLocationList()) {
-      String expectedContent = JOINER.join(Files
-          .readAllLines(Paths.get(srcDir.toString(), location.getName()), 
CHARSET));
-      String actualContent = JOINER
-          .join(Files.readAllLines(Paths.get(location.getUri()), CHARSET));
-      Assert.assertEquals(expectedContent, actualContent);
-    }
-  }
-}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
index f3185151e15..5a1995361a6 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
@@ -99,7 +99,7 @@ public void encodeAndOwn(byte[] value, OutputStream 
outStream, Context context)
   public byte[] decode(InputStream inStream, Context context)
       throws IOException, CoderException {
     if (context.isWholeStream) {
-      return StreamUtils.getBytes(inStream);
+      return StreamUtils.getBytesWithoutClosing(inStream);
     } else {
       int length = VarInt.decodeInt(inStream);
       if (length < 0) {
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
index 77e1664f35b..4627402bd00 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
@@ -99,7 +99,7 @@ public String decode(InputStream inStream) throws IOException 
{
   public String decode(InputStream inStream, Context context)
       throws IOException {
     if (context.isWholeStream) {
-      byte[] bytes = StreamUtils.getBytes(inStream);
+      byte[] bytes = StreamUtils.getBytesWithoutClosing(inStream);
       return new String(bytes, StandardCharsets.UTF_8);
     } else {
       try {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
index 350402b972c..6b9b116a46d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
@@ -28,6 +28,7 @@
 import com.google.common.base.Objects;
 import com.google.common.collect.Lists;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.Serializable;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
@@ -416,7 +417,9 @@ public SeekableByteChannel openSeekable() throws 
IOException {
 
     /** Returns the full contents of the file as bytes. */
     public byte[] readFullyAsBytes() throws IOException {
-      return StreamUtils.getBytes(Channels.newInputStream(open()));
+      try (InputStream stream = Channels.newInputStream(open())) {
+        return StreamUtils.getBytesWithoutClosing(stream);
+      }
     }
 
     /** Returns the full contents of the file as a {@link String} decoded as 
UTF-8. */
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StreamUtils.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StreamUtils.java
index 91abc2e4aa9..8315a31856b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StreamUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StreamUtils.java
@@ -38,7 +38,7 @@ private StreamUtils() {
   /**
    * Efficient converting stream to bytes.
    */
-  public static byte[] getBytes(InputStream stream) throws IOException {
+  public static byte[] getBytesWithoutClosing(InputStream stream) throws 
IOException {
     if (stream instanceof ExposedByteArrayInputStream) {
       // Fast path for the exposed version.
       return ((ExposedByteArrayInputStream) stream).readAll();
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StreamUtilsTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StreamUtilsTest.java
index 7a31184dc58..a9fae5a71bb 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StreamUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StreamUtilsTest.java
@@ -46,7 +46,7 @@ public void setUp() {
   @Test
   public void testGetBytesFromExposedByteArrayInputStream() throws IOException 
{
     InputStream stream = new ExposedByteArrayInputStream(testData);
-    byte[] bytes = StreamUtils.getBytes(stream);
+    byte[] bytes = StreamUtils.getBytesWithoutClosing(stream);
     assertArrayEquals(testData, bytes);
     assertSame(testData, bytes);
     assertEquals(0, stream.available());
@@ -55,7 +55,7 @@ public void testGetBytesFromExposedByteArrayInputStream() 
throws IOException {
   @Test
   public void testGetBytesFromByteArrayInputStream() throws IOException {
     InputStream stream = new ByteArrayInputStream(testData);
-    byte[] bytes = StreamUtils.getBytes(stream);
+    byte[] bytes = StreamUtils.getBytesWithoutClosing(stream);
     assertArrayEquals(testData, bytes);
     assertEquals(0, stream.available());
   }
@@ -65,7 +65,7 @@ public void testGetBytesFromInputStream() throws IOException {
     // Any stream which is not a ByteArrayInputStream.
     InputStream stream =
         new BufferedInputStream(new ByteArrayInputStream(testData));
-    byte[] bytes = StreamUtils.getBytes(stream);
+    byte[] bytes = StreamUtils.getBytesWithoutClosing(stream);
     assertArrayEquals(testData, bytes);
     assertEquals(0, stream.available());
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 112133)
    Time Spent: 6h 20m  (was: 6h 10m)

> ArtifactRetrievalService that retrieves artifacts from a distributed 
> filesystem
> -------------------------------------------------------------------------------
>
>                 Key: BEAM-4291
>                 URL: https://issues.apache.org/jira/browse/BEAM-4291
>             Project: Beam
>          Issue Type: Sub-task
>          Components: runner-core
>            Reporter: Eugene Kirpichov
>            Assignee: Axel Magnuson
>            Priority: Major
>          Time Spent: 6h 20m
>  Remaining Estimate: 0h
>
> In agreement with how they are staged in BEAM-4290.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to