Add a Local FS implementation of the Artifact Staging API

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2f178fbe
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2f178fbe
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2f178fbe

Branch: refs/heads/master
Commit: 2f178fbeab2846f940fb98b2518cc9aa9c24b31d
Parents: 465ecfc
Author: Thomas Groh <tg...@google.com>
Authored: Wed Sep 13 13:32:20 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Sep 22 15:02:43 2017 -0700

----------------------------------------------------------------------
 runners/local-artifact-service-java/pom.xml     | 116 ++++++++
 .../LocalFileSystemArtifactStagerService.java   | 276 +++++++++++++++++++
 .../beam/artifact/local/package-info.java       |  22 ++
 ...ocalFileSystemArtifactStagerServiceTest.java | 274 ++++++++++++++++++
 runners/pom.xml                                 |   1 +
 5 files changed, 689 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2f178fbe/runners/local-artifact-service-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/local-artifact-service-java/pom.xml 
b/runners/local-artifact-service-java/pom.xml
new file mode 100644
index 0000000..0215798
--- /dev/null
+++ b/runners/local-artifact-service-java/pom.xml
@@ -0,0 +1,116 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-runners-parent</artifactId>
+    <version>2.2.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-local-artifact-service-java</artifactId>
+  <name>Apache Beam :: Runners :: Java Local Artifact Service</name>
+  <description>The Beam Artifact Service exposes APIs to stage and retrieve
+    artifacts in a manner independent of the underlying storage system, for use
+    by the Beam portability framework. The local implementation uses the local
+    File System as the underlying storage system.</description>
+
+  <packaging>jar</packaging>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+      </plugin>
+
+      <!-- Coverage analysis for unit tests. -->
+      <plugin>
+        <groupId>org.jacoco</groupId>
+        <artifactId>jacoco-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-common-runner-api</artifactId>
+    </dependency>
+
+    <!-- build dependencies -->
+
+    <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>jsr305</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-stub</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <!-- test dependencies -->
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-jdk14</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/2f178fbe/runners/local-artifact-service-java/src/main/java/org/apache/beam/artifact/local/LocalFileSystemArtifactStagerService.java
----------------------------------------------------------------------
diff --git 
a/runners/local-artifact-service-java/src/main/java/org/apache/beam/artifact/local/LocalFileSystemArtifactStagerService.java
 
b/runners/local-artifact-service-java/src/main/java/org/apache/beam/artifact/local/LocalFileSystemArtifactStagerService.java
new file mode 100644
index 0000000..6b42a3b
--- /dev/null
+++ 
b/runners/local-artifact-service-java/src/main/java/org/apache/beam/artifact/local/LocalFileSystemArtifactStagerService.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.artifact.local;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.base.Throwables;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.common.runner.v1.ArtifactApi.ArtifactMetadata;
+import org.apache.beam.sdk.common.runner.v1.ArtifactApi.CommitManifestRequest;
+import org.apache.beam.sdk.common.runner.v1.ArtifactApi.CommitManifestResponse;
+import org.apache.beam.sdk.common.runner.v1.ArtifactApi.PutArtifactRequest;
+import 
org.apache.beam.sdk.common.runner.v1.ArtifactApi.PutArtifactRequest.ContentCase;
+import org.apache.beam.sdk.common.runner.v1.ArtifactApi.PutArtifactResponse;
+import 
org.apache.beam.sdk.common.runner.v1.ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** An {@code ArtifactStagingService} which stages files to a local temp 
directory. */
+public class LocalFileSystemArtifactStagerService extends 
ArtifactStagingServiceImplBase {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(LocalFileSystemArtifactStagerService.class);
+
+  public static LocalFileSystemArtifactStagerService withRootDirectory(File 
base) {
+    return new LocalFileSystemArtifactStagerService(base);
+  }
+
+  private final File stagingBase;
+  private final File artifactsBase;
+
+  private LocalFileSystemArtifactStagerService(File stagingBase) {
+    this.stagingBase = stagingBase;
+    if ((stagingBase.mkdirs() || stagingBase.exists()) && 
stagingBase.canWrite()) {
+      artifactsBase = new File(stagingBase, "artifacts");
+      checkState(
+          (artifactsBase.mkdir() || artifactsBase.exists()) && 
artifactsBase.canWrite(),
+          "Could not create artifact staging directory at %s",
+          artifactsBase);
+    } else {
+      throw new IllegalStateException(
+          String.format("Could not create staging directory structure at root 
%s", stagingBase));
+    }
+  }
+
+  @Override
+  public StreamObserver<PutArtifactRequest> putArtifact(
+      final StreamObserver<PutArtifactResponse> responseObserver) {
+    return new CreateAndWriteFileObserver(responseObserver);
+  }
+
+  @Override
+  public void commitManifest(
+      CommitManifestRequest request, StreamObserver<CommitManifestResponse> 
responseObserver) {
+    try {
+      commitManifestOrThrow(request, responseObserver);
+    } catch (StatusRuntimeException e) {
+      responseObserver.onError(e);
+      LOG.error("Failed to commit Manifest {}", request.getManifest(), e);
+    } catch (Exception e) {
+      responseObserver.onError(
+          Status.INTERNAL
+              .withCause(e)
+              .withDescription(Throwables.getStackTraceAsString(e))
+              .asRuntimeException());
+      LOG.error("Failed to commit Manifest {}", request.getManifest(), e);
+    }
+  }
+
+  private void commitManifestOrThrow(
+      CommitManifestRequest request, StreamObserver<CommitManifestResponse> 
responseObserver)
+      throws IOException {
+    Collection<ArtifactMetadata> missing = new ArrayList<>();
+    for (ArtifactMetadata artifact : request.getManifest().getArtifactList()) {
+      // TODO: Validate the checksums on the server side, to fail more 
aggressively if require
+      if (!getArtifactFile(artifact.getName()).exists()) {
+        missing.add(artifact);
+      }
+    }
+    if (!missing.isEmpty()) {
+      throw Status.INVALID_ARGUMENT
+          .withDescription(
+              String.format("Attempted to commit manifest with missing 
Artifacts: [%s]", missing))
+          .asRuntimeException();
+    }
+    File mf = new File(stagingBase, "MANIFEST");
+    checkState(mf.createNewFile(), "Could not create file to store manifest");
+    try (OutputStream mfOut = new FileOutputStream(mf)) {
+      request.getManifest().writeTo(mfOut);
+    }
+    responseObserver.onNext(
+        CommitManifestResponse.newBuilder()
+            .setStagingToken(stagingBase.getCanonicalPath())
+            .build());
+    responseObserver.onCompleted();
+  }
+
+  File getArtifactFile(String artifactName) {
+    return new File(artifactsBase, artifactName);
+  }
+
+  private class CreateAndWriteFileObserver implements 
StreamObserver<PutArtifactRequest> {
+    private final StreamObserver<PutArtifactResponse> responseObserver;
+    private FileWritingObserver writer;
+
+    private CreateAndWriteFileObserver(StreamObserver<PutArtifactResponse> 
responseObserver) {
+      this.responseObserver = responseObserver;
+    }
+
+    @Override
+    public void onNext(PutArtifactRequest value) {
+      try {
+        if (writer == null) {
+          if (!value.getContentCase().equals(ContentCase.METADATA)) {
+            throw Status.INVALID_ARGUMENT
+                .withDescription(
+                    String.format(
+                        "Expected the first %s to contain the Artifact Name, 
got %s",
+                        PutArtifactRequest.class.getSimpleName(), 
value.getContentCase()))
+                .asRuntimeException();
+          }
+          writer = createFile(value.getMetadata());
+        } else {
+          writer.onNext(value);
+        }
+      } catch (StatusRuntimeException e) {
+        responseObserver.onError(e);
+      } catch (Exception e) {
+        responseObserver.onError(
+            Status.INTERNAL
+                .withCause(e)
+                .withDescription(Throwables.getStackTraceAsString(e))
+                .asRuntimeException());
+      }
+    }
+
+    private FileWritingObserver createFile(ArtifactMetadata metadata) throws 
IOException {
+      File destination = getArtifactFile(metadata.getName());
+      if (!destination.createNewFile()) {
+        throw Status.ALREADY_EXISTS
+            .withDescription(String.format("Artifact with name %s already 
exists", metadata))
+            .asRuntimeException();
+      }
+      return new FileWritingObserver(
+          destination, new FileOutputStream(destination), responseObserver);
+    }
+
+    @Override
+    public void onError(Throwable t) {
+      if (writer != null) {
+        writer.onError(t);
+      } else {
+        responseObserver.onCompleted();
+      }
+    }
+
+    @Override
+    public void onCompleted() {
+      if (writer != null) {
+        writer.onCompleted();
+      } else {
+        responseObserver.onCompleted();
+      }
+    }
+  }
+
+  private static class FileWritingObserver implements 
StreamObserver<PutArtifactRequest> {
+    private final File destination;
+    private final OutputStream target;
+    private final StreamObserver<PutArtifactResponse> responseObserver;
+
+    private FileWritingObserver(
+        File destination,
+        OutputStream target,
+        StreamObserver<PutArtifactResponse> responseObserver) {
+      this.destination = destination;
+      this.target = target;
+      this.responseObserver = responseObserver;
+    }
+
+    @Override
+    public void onNext(PutArtifactRequest value) {
+      try {
+        if (value.getData() == null) {
+          StatusRuntimeException e = 
Status.INVALID_ARGUMENT.withDescription(String.format(
+              "Expected all chunks in the current stream state to contain 
data, got %s",
+              value.getContentCase())).asRuntimeException();
+          throw e;
+        }
+        value.getData().getData().writeTo(target);
+      } catch (Exception e) {
+        cleanedUp(e);
+      }
+    }
+
+    @Override
+    public void onError(Throwable t) {
+      if (cleanedUp(null)) {
+        responseObserver.onCompleted();
+      }
+    }
+
+    @Override
+    public void onCompleted() {
+      try {
+        target.close();
+      } catch (IOException e) {
+        LOG.error("Failed to complete writing file {}", destination, e);
+        cleanedUp(e);
+        return;
+      }
+      responseObserver.onNext(PutArtifactResponse.getDefaultInstance());
+      responseObserver.onCompleted();
+    }
+
+    /**
+     * Cleans up after the file writing failed exceptionally, due to an error 
either in the service
+     * or sent from the client.
+     *
+     * @return false if an error was reported, true otherwise
+     */
+    private boolean cleanedUp(@Nullable Throwable whyFailed) {
+      Throwable actual = whyFailed;
+      try {
+        target.close();
+        if (!destination.delete()) {
+          LOG.debug("Couldn't delete failed write at {}", destination);
+        }
+      } catch (IOException e) {
+        if (whyFailed == null) {
+          actual = e;
+        } else {
+          actual.addSuppressed(e);
+        }
+        LOG.error("Failed to clean up after writing file {}", destination, e);
+      }
+      if (actual != null) {
+        if (actual instanceof StatusException || actual instanceof 
StatusRuntimeException) {
+          responseObserver.onError(actual);
+        } else {
+          Status status =
+              Status.INTERNAL
+                  .withCause(actual)
+                  .withDescription(Throwables.getStackTraceAsString(actual));
+          responseObserver.onError(status.asException());
+        }
+      }
+      return actual == null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/2f178fbe/runners/local-artifact-service-java/src/main/java/org/apache/beam/artifact/local/package-info.java
----------------------------------------------------------------------
diff --git 
a/runners/local-artifact-service-java/src/main/java/org/apache/beam/artifact/local/package-info.java
 
b/runners/local-artifact-service-java/src/main/java/org/apache/beam/artifact/local/package-info.java
new file mode 100644
index 0000000..17d0943
--- /dev/null
+++ 
b/runners/local-artifact-service-java/src/main/java/org/apache/beam/artifact/local/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Provides local implementations of the Artifact API services.
+ */
+package org.apache.beam.artifact.local;

http://git-wip-us.apache.org/repos/asf/beam/blob/2f178fbe/runners/local-artifact-service-java/src/test/java/org/apache/beam/artifact/local/LocalFileSystemArtifactStagerServiceTest.java
----------------------------------------------------------------------
diff --git 
a/runners/local-artifact-service-java/src/test/java/org/apache/beam/artifact/local/LocalFileSystemArtifactStagerServiceTest.java
 
b/runners/local-artifact-service-java/src/test/java/org/apache/beam/artifact/local/LocalFileSystemArtifactStagerServiceTest.java
new file mode 100644
index 0000000..b7ba03f
--- /dev/null
+++ 
b/runners/local-artifact-service-java/src/test/java/org/apache/beam/artifact/local/LocalFileSystemArtifactStagerServiceTest.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.artifact.local;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.google.protobuf.ByteString;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.internal.ServerImpl;
+import io.grpc.stub.StreamObserver;
+import java.io.File;
+import java.io.FileInputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.common.runner.v1.ArtifactApi.ArtifactChunk;
+import org.apache.beam.sdk.common.runner.v1.ArtifactApi.ArtifactMetadata;
+import org.apache.beam.sdk.common.runner.v1.ArtifactApi.CommitManifestRequest;
+import org.apache.beam.sdk.common.runner.v1.ArtifactApi.CommitManifestResponse;
+import org.apache.beam.sdk.common.runner.v1.ArtifactApi.Manifest;
+import org.apache.beam.sdk.common.runner.v1.ArtifactApi.PutArtifactRequest;
+import org.apache.beam.sdk.common.runner.v1.ArtifactApi.PutArtifactResponse;
+import org.apache.beam.sdk.common.runner.v1.ArtifactStagingServiceGrpc;
+import 
org.apache.beam.sdk.common.runner.v1.ArtifactStagingServiceGrpc.ArtifactStagingServiceStub;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link LocalFileSystemArtifactStagerService}. */
+@RunWith(JUnit4.class)
+public class LocalFileSystemArtifactStagerServiceTest {
+  @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  private ArtifactStagingServiceStub stub;
+
+  private LocalFileSystemArtifactStagerService stager;
+  private ServerImpl server;
+
+  @Before
+  public void setup() throws Exception {
+    stager = 
LocalFileSystemArtifactStagerService.withRootDirectory(temporaryFolder.newFolder());
+
+    server =
+        InProcessServerBuilder.forName("fs_stager")
+            .directExecutor()
+            .addService(stager)
+            .build()
+            .start();
+
+    stub =
+        ArtifactStagingServiceGrpc.newStub(
+            
InProcessChannelBuilder.forName("fs_stager").usePlaintext(true).build());
+  }
+
+  @After
+  public void teardown() {
+    server.shutdownNow();
+  }
+
+  @Test
+  public void singleDataPutArtifactSucceeds() throws Exception {
+    byte[] data = "foo-bar-baz".getBytes();
+    RecordingStreamObserver<PutArtifactResponse> responseObserver = new 
RecordingStreamObserver<>();
+    StreamObserver<PutArtifactRequest> requestObserver = 
stub.putArtifact(responseObserver);
+
+    String name = "my-artifact";
+    requestObserver.onNext(
+        PutArtifactRequest.newBuilder()
+            .setMetadata(ArtifactMetadata.newBuilder().setName(name).build())
+            .build());
+    requestObserver.onNext(
+        PutArtifactRequest.newBuilder()
+            
.setData(ArtifactChunk.newBuilder().setData(ByteString.copyFrom(data)).build())
+            .build());
+    requestObserver.onCompleted();
+
+    responseObserver.awaitTerminalState();
+
+    File staged = stager.getArtifactFile(name);
+    assertThat(staged.exists(), is(true));
+    ByteBuffer buf = ByteBuffer.allocate(data.length);
+    new FileInputStream(staged).getChannel().read(buf);
+    Assert.assertArrayEquals(data, buf.array());
+  }
+
+  @Test
+  public void multiPartPutArtifactSucceeds() throws Exception {
+    byte[] partOne = "foo-".getBytes();
+    byte[] partTwo = "bar-".getBytes();
+    byte[] partThree = "baz".getBytes();
+    RecordingStreamObserver<PutArtifactResponse> responseObserver = new 
RecordingStreamObserver<>();
+    StreamObserver<PutArtifactRequest> requestObserver = 
stub.putArtifact(responseObserver);
+
+    String name = "my-artifact";
+    requestObserver.onNext(
+        PutArtifactRequest.newBuilder()
+            .setMetadata(ArtifactMetadata.newBuilder().setName(name).build())
+            .build());
+    requestObserver.onNext(
+        PutArtifactRequest.newBuilder()
+            
.setData(ArtifactChunk.newBuilder().setData(ByteString.copyFrom(partOne)).build())
+            .build());
+    requestObserver.onNext(
+        PutArtifactRequest.newBuilder()
+            
.setData(ArtifactChunk.newBuilder().setData(ByteString.copyFrom(partTwo)).build())
+            .build());
+    requestObserver.onNext(
+        PutArtifactRequest.newBuilder()
+            
.setData(ArtifactChunk.newBuilder().setData(ByteString.copyFrom(partThree)).build())
+            .build());
+    requestObserver.onCompleted();
+
+    responseObserver.awaitTerminalState();
+
+    File staged = stager.getArtifactFile(name);
+    assertThat(staged.exists(), is(true));
+    ByteBuffer buf = ByteBuffer.allocate("foo-bar-baz".length());
+    new FileInputStream(staged).getChannel().read(buf);
+    Assert.assertArrayEquals("foo-bar-baz".getBytes(), buf.array());
+  }
+
+  @Test
+  public void putArtifactBeforeNameFails() {
+    byte[] data = "foo-".getBytes();
+    RecordingStreamObserver<PutArtifactResponse> responseObserver = new 
RecordingStreamObserver<>();
+    StreamObserver<PutArtifactRequest> requestObserver = 
stub.putArtifact(responseObserver);
+
+    requestObserver.onNext(
+        PutArtifactRequest.newBuilder()
+            
.setData(ArtifactChunk.newBuilder().setData(ByteString.copyFrom(data)).build())
+            .build());
+
+    responseObserver.awaitTerminalState();
+
+    assertThat(responseObserver.error, Matchers.not(Matchers.nullValue()));
+  }
+
+  @Test
+  public void putArtifactWithNoContentFails() {
+    RecordingStreamObserver<PutArtifactResponse> responseObserver = new 
RecordingStreamObserver<>();
+    StreamObserver<PutArtifactRequest> requestObserver = 
stub.putArtifact(responseObserver);
+
+    requestObserver.onNext(
+        
PutArtifactRequest.newBuilder().setData(ArtifactChunk.getDefaultInstance()).build());
+
+    responseObserver.awaitTerminalState();
+
+    assertThat(responseObserver.error, Matchers.not(Matchers.nullValue()));
+  }
+
+  @Test
+  public void commitManifestWithAllArtifactsSucceeds() {
+    ArtifactMetadata firstArtifact = stageBytes("first-artifact", "foo, bar, 
baz, quux".getBytes());
+    ArtifactMetadata secondArtifact = stageBytes("second-artifact", "spam, 
ham, eggs".getBytes());
+
+    Manifest manifest =
+        
Manifest.newBuilder().addArtifact(firstArtifact).addArtifact(secondArtifact).build();
+
+    RecordingStreamObserver<CommitManifestResponse> commitResponseObserver =
+        new RecordingStreamObserver<>();
+    stub.commitManifest(
+        CommitManifestRequest.newBuilder().setManifest(manifest).build(), 
commitResponseObserver);
+
+    commitResponseObserver.awaitTerminalState();
+
+    assertThat(commitResponseObserver.completed, is(true));
+    assertThat(commitResponseObserver.responses, Matchers.hasSize(1));
+    CommitManifestResponse commitResponse = 
commitResponseObserver.responses.get(0);
+    assertThat(commitResponse.getStagingToken(), 
Matchers.not(Matchers.nullValue()));
+  }
+
+  @Test
+  public void commitManifestWithMissingArtifactFails() {
+    ArtifactMetadata firstArtifact = stageBytes("first-artifact", "foo, bar, 
baz, quux".getBytes());
+    ArtifactMetadata absentArtifact = 
ArtifactMetadata.newBuilder().setName("absent").build();
+
+    Manifest manifest =
+        
Manifest.newBuilder().addArtifact(firstArtifact).addArtifact(absentArtifact).build();
+
+    RecordingStreamObserver<CommitManifestResponse> commitResponseObserver =
+        new RecordingStreamObserver<>();
+    
stub.commitManifest(CommitManifestRequest.newBuilder().setManifest(manifest).build(),
+        commitResponseObserver);
+
+    commitResponseObserver.awaitTerminalState();
+
+    assertThat(commitResponseObserver.error, 
Matchers.not(Matchers.nullValue()));
+  }
+
+  private ArtifactMetadata stageBytes(String name, byte[] bytes) {
+    StreamObserver<PutArtifactRequest> requests =
+        stub.putArtifact(new RecordingStreamObserver<PutArtifactResponse>());
+    requests.onNext(
+        PutArtifactRequest.newBuilder()
+            .setMetadata(ArtifactMetadata.newBuilder().setName(name).build())
+            .build());
+    requests.onNext(
+        PutArtifactRequest.newBuilder()
+            
.setData(ArtifactChunk.newBuilder().setData(ByteString.copyFrom(bytes)).build())
+            .build());
+    requests.onCompleted();
+    return ArtifactMetadata.newBuilder().setName(name).build();
+  }
+
+  private static class RecordingStreamObserver<T> implements StreamObserver<T> 
{
+    private List<T> responses = new ArrayList<>();
+    @Nullable private Throwable error = null;
+    private boolean completed = false;
+
+    @Override
+    public void onNext(T value) {
+      failIfTerminal();
+      responses.add(value);
+    }
+
+    @Override
+    public void onError(Throwable t) {
+      failIfTerminal();
+      error = t;
+    }
+
+    @Override
+    public void onCompleted() {
+      failIfTerminal();
+      completed = true;
+    }
+
+    private boolean isTerminal() {
+      return error != null || completed;
+    }
+
+    private void failIfTerminal() {
+      if (isTerminal()) {
+        Assert.fail(
+            String.format(
+                "Should have terminated after entering a terminal state: 
completed %s, error %s",
+                completed, error));
+      }
+    }
+
+    void awaitTerminalState() {
+      while (!isTerminal()) {
+        Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/2f178fbe/runners/pom.xml
----------------------------------------------------------------------
diff --git a/runners/pom.xml b/runners/pom.xml
index 4412ed6..4f06748 100644
--- a/runners/pom.xml
+++ b/runners/pom.xml
@@ -35,6 +35,7 @@
   <modules>
     <module>core-construction-java</module>
     <module>core-java</module>
+    <module>local-artifact-service-java</module>
     <module>direct-java</module>
     <module>flink</module>
     <module>google-cloud-dataflow-java</module>

Reply via email to