Add a Local FS implementation of the Artifact Staging API
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2f178fbe Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2f178fbe Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2f178fbe Branch: refs/heads/master Commit: 2f178fbeab2846f940fb98b2518cc9aa9c24b31d Parents: 465ecfc Author: Thomas Groh <tg...@google.com> Authored: Wed Sep 13 13:32:20 2017 -0700 Committer: Thomas Groh <tg...@google.com> Committed: Fri Sep 22 15:02:43 2017 -0700 ---------------------------------------------------------------------- runners/local-artifact-service-java/pom.xml | 116 ++++++++ .../LocalFileSystemArtifactStagerService.java | 276 +++++++++++++++++++ .../beam/artifact/local/package-info.java | 22 ++ ...ocalFileSystemArtifactStagerServiceTest.java | 274 ++++++++++++++++++ runners/pom.xml | 1 + 5 files changed, 689 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/2f178fbe/runners/local-artifact-service-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/local-artifact-service-java/pom.xml b/runners/local-artifact-service-java/pom.xml new file mode 100644 index 0000000..0215798 --- /dev/null +++ b/runners/local-artifact-service-java/pom.xml @@ -0,0 +1,116 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-parent</artifactId> + <version>2.2.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>beam-local-artifact-service-java</artifactId> + <name>Apache Beam :: Runners :: Java Local Artifact Service</name> + <description>The Beam Artifact Service exposes APIs to stage and retrieve + artifacts in a manner independent of the underlying storage system, for use + by the Beam portability framework. The local implementation uses the local + File System as the underlying storage system.</description> + + <packaging>jar</packaging> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + </plugin> + + <!-- Coverage analysis for unit tests. --> + <plugin> + <groupId>org.jacoco</groupId> + <artifactId>jacoco-maven-plugin</artifactId> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-common-runner-api</artifactId> + </dependency> + + <!-- build dependencies --> + + <dependency> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-core</artifactId> + </dependency> + + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-stub</artifactId> + </dependency> + + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <!-- test dependencies --> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/beam/blob/2f178fbe/runners/local-artifact-service-java/src/main/java/org/apache/beam/artifact/local/LocalFileSystemArtifactStagerService.java ---------------------------------------------------------------------- diff --git a/runners/local-artifact-service-java/src/main/java/org/apache/beam/artifact/local/LocalFileSystemArtifactStagerService.java b/runners/local-artifact-service-java/src/main/java/org/apache/beam/artifact/local/LocalFileSystemArtifactStagerService.java new file mode 100644 index 0000000..6b42a3b --- /dev/null +++ b/runners/local-artifact-service-java/src/main/java/org/apache/beam/artifact/local/LocalFileSystemArtifactStagerService.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.artifact.local; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.base.Throwables; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; +import javax.annotation.Nullable; +import org.apache.beam.sdk.common.runner.v1.ArtifactApi.ArtifactMetadata; +import org.apache.beam.sdk.common.runner.v1.ArtifactApi.CommitManifestRequest; +import org.apache.beam.sdk.common.runner.v1.ArtifactApi.CommitManifestResponse; +import org.apache.beam.sdk.common.runner.v1.ArtifactApi.PutArtifactRequest; +import org.apache.beam.sdk.common.runner.v1.ArtifactApi.PutArtifactRequest.ContentCase; +import org.apache.beam.sdk.common.runner.v1.ArtifactApi.PutArtifactResponse; +import org.apache.beam.sdk.common.runner.v1.ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** An {@code ArtifactStagingService} which stages files to a local temp directory. */ +public class LocalFileSystemArtifactStagerService extends ArtifactStagingServiceImplBase { + private static final Logger LOG = + LoggerFactory.getLogger(LocalFileSystemArtifactStagerService.class); + + public static LocalFileSystemArtifactStagerService withRootDirectory(File base) { + return new LocalFileSystemArtifactStagerService(base); + } + + private final File stagingBase; + private final File artifactsBase; + + private LocalFileSystemArtifactStagerService(File stagingBase) { + this.stagingBase = stagingBase; + if ((stagingBase.mkdirs() || stagingBase.exists()) && stagingBase.canWrite()) { + artifactsBase = new File(stagingBase, "artifacts"); + checkState( + (artifactsBase.mkdir() || artifactsBase.exists()) && artifactsBase.canWrite(), + "Could not create artifact staging directory at %s", + artifactsBase); + } else { + throw new IllegalStateException( + String.format("Could not create staging directory structure at root %s", stagingBase)); + } + } + + @Override + public StreamObserver<PutArtifactRequest> putArtifact( + final StreamObserver<PutArtifactResponse> responseObserver) { + return new CreateAndWriteFileObserver(responseObserver); + } + + @Override + public void commitManifest( + CommitManifestRequest request, StreamObserver<CommitManifestResponse> responseObserver) { + try { + commitManifestOrThrow(request, responseObserver); + } catch (StatusRuntimeException e) { + responseObserver.onError(e); + LOG.error("Failed to commit Manifest {}", request.getManifest(), e); + } catch (Exception e) { + responseObserver.onError( + Status.INTERNAL + .withCause(e) + .withDescription(Throwables.getStackTraceAsString(e)) + .asRuntimeException()); + LOG.error("Failed to commit Manifest {}", request.getManifest(), e); + } + } + + private void commitManifestOrThrow( + CommitManifestRequest request, StreamObserver<CommitManifestResponse> responseObserver) + throws IOException { + Collection<ArtifactMetadata> missing = new ArrayList<>(); + for (ArtifactMetadata artifact : request.getManifest().getArtifactList()) { + // TODO: Validate the checksums on the server side, to fail more aggressively if require + if (!getArtifactFile(artifact.getName()).exists()) { + missing.add(artifact); + } + } + if (!missing.isEmpty()) { + throw Status.INVALID_ARGUMENT + .withDescription( + String.format("Attempted to commit manifest with missing Artifacts: [%s]", missing)) + .asRuntimeException(); + } + File mf = new File(stagingBase, "MANIFEST"); + checkState(mf.createNewFile(), "Could not create file to store manifest"); + try (OutputStream mfOut = new FileOutputStream(mf)) { + request.getManifest().writeTo(mfOut); + } + responseObserver.onNext( + CommitManifestResponse.newBuilder() + .setStagingToken(stagingBase.getCanonicalPath()) + .build()); + responseObserver.onCompleted(); + } + + File getArtifactFile(String artifactName) { + return new File(artifactsBase, artifactName); + } + + private class CreateAndWriteFileObserver implements StreamObserver<PutArtifactRequest> { + private final StreamObserver<PutArtifactResponse> responseObserver; + private FileWritingObserver writer; + + private CreateAndWriteFileObserver(StreamObserver<PutArtifactResponse> responseObserver) { + this.responseObserver = responseObserver; + } + + @Override + public void onNext(PutArtifactRequest value) { + try { + if (writer == null) { + if (!value.getContentCase().equals(ContentCase.METADATA)) { + throw Status.INVALID_ARGUMENT + .withDescription( + String.format( + "Expected the first %s to contain the Artifact Name, got %s", + PutArtifactRequest.class.getSimpleName(), value.getContentCase())) + .asRuntimeException(); + } + writer = createFile(value.getMetadata()); + } else { + writer.onNext(value); + } + } catch (StatusRuntimeException e) { + responseObserver.onError(e); + } catch (Exception e) { + responseObserver.onError( + Status.INTERNAL + .withCause(e) + .withDescription(Throwables.getStackTraceAsString(e)) + .asRuntimeException()); + } + } + + private FileWritingObserver createFile(ArtifactMetadata metadata) throws IOException { + File destination = getArtifactFile(metadata.getName()); + if (!destination.createNewFile()) { + throw Status.ALREADY_EXISTS + .withDescription(String.format("Artifact with name %s already exists", metadata)) + .asRuntimeException(); + } + return new FileWritingObserver( + destination, new FileOutputStream(destination), responseObserver); + } + + @Override + public void onError(Throwable t) { + if (writer != null) { + writer.onError(t); + } else { + responseObserver.onCompleted(); + } + } + + @Override + public void onCompleted() { + if (writer != null) { + writer.onCompleted(); + } else { + responseObserver.onCompleted(); + } + } + } + + private static class FileWritingObserver implements StreamObserver<PutArtifactRequest> { + private final File destination; + private final OutputStream target; + private final StreamObserver<PutArtifactResponse> responseObserver; + + private FileWritingObserver( + File destination, + OutputStream target, + StreamObserver<PutArtifactResponse> responseObserver) { + this.destination = destination; + this.target = target; + this.responseObserver = responseObserver; + } + + @Override + public void onNext(PutArtifactRequest value) { + try { + if (value.getData() == null) { + StatusRuntimeException e = Status.INVALID_ARGUMENT.withDescription(String.format( + "Expected all chunks in the current stream state to contain data, got %s", + value.getContentCase())).asRuntimeException(); + throw e; + } + value.getData().getData().writeTo(target); + } catch (Exception e) { + cleanedUp(e); + } + } + + @Override + public void onError(Throwable t) { + if (cleanedUp(null)) { + responseObserver.onCompleted(); + } + } + + @Override + public void onCompleted() { + try { + target.close(); + } catch (IOException e) { + LOG.error("Failed to complete writing file {}", destination, e); + cleanedUp(e); + return; + } + responseObserver.onNext(PutArtifactResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } + + /** + * Cleans up after the file writing failed exceptionally, due to an error either in the service + * or sent from the client. + * + * @return false if an error was reported, true otherwise + */ + private boolean cleanedUp(@Nullable Throwable whyFailed) { + Throwable actual = whyFailed; + try { + target.close(); + if (!destination.delete()) { + LOG.debug("Couldn't delete failed write at {}", destination); + } + } catch (IOException e) { + if (whyFailed == null) { + actual = e; + } else { + actual.addSuppressed(e); + } + LOG.error("Failed to clean up after writing file {}", destination, e); + } + if (actual != null) { + if (actual instanceof StatusException || actual instanceof StatusRuntimeException) { + responseObserver.onError(actual); + } else { + Status status = + Status.INTERNAL + .withCause(actual) + .withDescription(Throwables.getStackTraceAsString(actual)); + responseObserver.onError(status.asException()); + } + } + return actual == null; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/2f178fbe/runners/local-artifact-service-java/src/main/java/org/apache/beam/artifact/local/package-info.java ---------------------------------------------------------------------- diff --git a/runners/local-artifact-service-java/src/main/java/org/apache/beam/artifact/local/package-info.java b/runners/local-artifact-service-java/src/main/java/org/apache/beam/artifact/local/package-info.java new file mode 100644 index 0000000..17d0943 --- /dev/null +++ b/runners/local-artifact-service-java/src/main/java/org/apache/beam/artifact/local/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Provides local implementations of the Artifact API services. + */ +package org.apache.beam.artifact.local; http://git-wip-us.apache.org/repos/asf/beam/blob/2f178fbe/runners/local-artifact-service-java/src/test/java/org/apache/beam/artifact/local/LocalFileSystemArtifactStagerServiceTest.java ---------------------------------------------------------------------- diff --git a/runners/local-artifact-service-java/src/test/java/org/apache/beam/artifact/local/LocalFileSystemArtifactStagerServiceTest.java b/runners/local-artifact-service-java/src/test/java/org/apache/beam/artifact/local/LocalFileSystemArtifactStagerServiceTest.java new file mode 100644 index 0000000..b7ba03f --- /dev/null +++ b/runners/local-artifact-service-java/src/test/java/org/apache/beam/artifact/local/LocalFileSystemArtifactStagerServiceTest.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.artifact.local; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +import com.google.common.util.concurrent.Uninterruptibles; +import com.google.protobuf.ByteString; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.internal.ServerImpl; +import io.grpc.stub.StreamObserver; +import java.io.File; +import java.io.FileInputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.beam.sdk.common.runner.v1.ArtifactApi.ArtifactChunk; +import org.apache.beam.sdk.common.runner.v1.ArtifactApi.ArtifactMetadata; +import org.apache.beam.sdk.common.runner.v1.ArtifactApi.CommitManifestRequest; +import org.apache.beam.sdk.common.runner.v1.ArtifactApi.CommitManifestResponse; +import org.apache.beam.sdk.common.runner.v1.ArtifactApi.Manifest; +import org.apache.beam.sdk.common.runner.v1.ArtifactApi.PutArtifactRequest; +import org.apache.beam.sdk.common.runner.v1.ArtifactApi.PutArtifactResponse; +import org.apache.beam.sdk.common.runner.v1.ArtifactStagingServiceGrpc; +import org.apache.beam.sdk.common.runner.v1.ArtifactStagingServiceGrpc.ArtifactStagingServiceStub; +import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link LocalFileSystemArtifactStagerService}. */ +@RunWith(JUnit4.class) +public class LocalFileSystemArtifactStagerServiceTest { + @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private ArtifactStagingServiceStub stub; + + private LocalFileSystemArtifactStagerService stager; + private ServerImpl server; + + @Before + public void setup() throws Exception { + stager = LocalFileSystemArtifactStagerService.withRootDirectory(temporaryFolder.newFolder()); + + server = + InProcessServerBuilder.forName("fs_stager") + .directExecutor() + .addService(stager) + .build() + .start(); + + stub = + ArtifactStagingServiceGrpc.newStub( + InProcessChannelBuilder.forName("fs_stager").usePlaintext(true).build()); + } + + @After + public void teardown() { + server.shutdownNow(); + } + + @Test + public void singleDataPutArtifactSucceeds() throws Exception { + byte[] data = "foo-bar-baz".getBytes(); + RecordingStreamObserver<PutArtifactResponse> responseObserver = new RecordingStreamObserver<>(); + StreamObserver<PutArtifactRequest> requestObserver = stub.putArtifact(responseObserver); + + String name = "my-artifact"; + requestObserver.onNext( + PutArtifactRequest.newBuilder() + .setMetadata(ArtifactMetadata.newBuilder().setName(name).build()) + .build()); + requestObserver.onNext( + PutArtifactRequest.newBuilder() + .setData(ArtifactChunk.newBuilder().setData(ByteString.copyFrom(data)).build()) + .build()); + requestObserver.onCompleted(); + + responseObserver.awaitTerminalState(); + + File staged = stager.getArtifactFile(name); + assertThat(staged.exists(), is(true)); + ByteBuffer buf = ByteBuffer.allocate(data.length); + new FileInputStream(staged).getChannel().read(buf); + Assert.assertArrayEquals(data, buf.array()); + } + + @Test + public void multiPartPutArtifactSucceeds() throws Exception { + byte[] partOne = "foo-".getBytes(); + byte[] partTwo = "bar-".getBytes(); + byte[] partThree = "baz".getBytes(); + RecordingStreamObserver<PutArtifactResponse> responseObserver = new RecordingStreamObserver<>(); + StreamObserver<PutArtifactRequest> requestObserver = stub.putArtifact(responseObserver); + + String name = "my-artifact"; + requestObserver.onNext( + PutArtifactRequest.newBuilder() + .setMetadata(ArtifactMetadata.newBuilder().setName(name).build()) + .build()); + requestObserver.onNext( + PutArtifactRequest.newBuilder() + .setData(ArtifactChunk.newBuilder().setData(ByteString.copyFrom(partOne)).build()) + .build()); + requestObserver.onNext( + PutArtifactRequest.newBuilder() + .setData(ArtifactChunk.newBuilder().setData(ByteString.copyFrom(partTwo)).build()) + .build()); + requestObserver.onNext( + PutArtifactRequest.newBuilder() + .setData(ArtifactChunk.newBuilder().setData(ByteString.copyFrom(partThree)).build()) + .build()); + requestObserver.onCompleted(); + + responseObserver.awaitTerminalState(); + + File staged = stager.getArtifactFile(name); + assertThat(staged.exists(), is(true)); + ByteBuffer buf = ByteBuffer.allocate("foo-bar-baz".length()); + new FileInputStream(staged).getChannel().read(buf); + Assert.assertArrayEquals("foo-bar-baz".getBytes(), buf.array()); + } + + @Test + public void putArtifactBeforeNameFails() { + byte[] data = "foo-".getBytes(); + RecordingStreamObserver<PutArtifactResponse> responseObserver = new RecordingStreamObserver<>(); + StreamObserver<PutArtifactRequest> requestObserver = stub.putArtifact(responseObserver); + + requestObserver.onNext( + PutArtifactRequest.newBuilder() + .setData(ArtifactChunk.newBuilder().setData(ByteString.copyFrom(data)).build()) + .build()); + + responseObserver.awaitTerminalState(); + + assertThat(responseObserver.error, Matchers.not(Matchers.nullValue())); + } + + @Test + public void putArtifactWithNoContentFails() { + RecordingStreamObserver<PutArtifactResponse> responseObserver = new RecordingStreamObserver<>(); + StreamObserver<PutArtifactRequest> requestObserver = stub.putArtifact(responseObserver); + + requestObserver.onNext( + PutArtifactRequest.newBuilder().setData(ArtifactChunk.getDefaultInstance()).build()); + + responseObserver.awaitTerminalState(); + + assertThat(responseObserver.error, Matchers.not(Matchers.nullValue())); + } + + @Test + public void commitManifestWithAllArtifactsSucceeds() { + ArtifactMetadata firstArtifact = stageBytes("first-artifact", "foo, bar, baz, quux".getBytes()); + ArtifactMetadata secondArtifact = stageBytes("second-artifact", "spam, ham, eggs".getBytes()); + + Manifest manifest = + Manifest.newBuilder().addArtifact(firstArtifact).addArtifact(secondArtifact).build(); + + RecordingStreamObserver<CommitManifestResponse> commitResponseObserver = + new RecordingStreamObserver<>(); + stub.commitManifest( + CommitManifestRequest.newBuilder().setManifest(manifest).build(), commitResponseObserver); + + commitResponseObserver.awaitTerminalState(); + + assertThat(commitResponseObserver.completed, is(true)); + assertThat(commitResponseObserver.responses, Matchers.hasSize(1)); + CommitManifestResponse commitResponse = commitResponseObserver.responses.get(0); + assertThat(commitResponse.getStagingToken(), Matchers.not(Matchers.nullValue())); + } + + @Test + public void commitManifestWithMissingArtifactFails() { + ArtifactMetadata firstArtifact = stageBytes("first-artifact", "foo, bar, baz, quux".getBytes()); + ArtifactMetadata absentArtifact = ArtifactMetadata.newBuilder().setName("absent").build(); + + Manifest manifest = + Manifest.newBuilder().addArtifact(firstArtifact).addArtifact(absentArtifact).build(); + + RecordingStreamObserver<CommitManifestResponse> commitResponseObserver = + new RecordingStreamObserver<>(); + stub.commitManifest(CommitManifestRequest.newBuilder().setManifest(manifest).build(), + commitResponseObserver); + + commitResponseObserver.awaitTerminalState(); + + assertThat(commitResponseObserver.error, Matchers.not(Matchers.nullValue())); + } + + private ArtifactMetadata stageBytes(String name, byte[] bytes) { + StreamObserver<PutArtifactRequest> requests = + stub.putArtifact(new RecordingStreamObserver<PutArtifactResponse>()); + requests.onNext( + PutArtifactRequest.newBuilder() + .setMetadata(ArtifactMetadata.newBuilder().setName(name).build()) + .build()); + requests.onNext( + PutArtifactRequest.newBuilder() + .setData(ArtifactChunk.newBuilder().setData(ByteString.copyFrom(bytes)).build()) + .build()); + requests.onCompleted(); + return ArtifactMetadata.newBuilder().setName(name).build(); + } + + private static class RecordingStreamObserver<T> implements StreamObserver<T> { + private List<T> responses = new ArrayList<>(); + @Nullable private Throwable error = null; + private boolean completed = false; + + @Override + public void onNext(T value) { + failIfTerminal(); + responses.add(value); + } + + @Override + public void onError(Throwable t) { + failIfTerminal(); + error = t; + } + + @Override + public void onCompleted() { + failIfTerminal(); + completed = true; + } + + private boolean isTerminal() { + return error != null || completed; + } + + private void failIfTerminal() { + if (isTerminal()) { + Assert.fail( + String.format( + "Should have terminated after entering a terminal state: completed %s, error %s", + completed, error)); + } + } + + void awaitTerminalState() { + while (!isTerminal()) { + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS); + } + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/2f178fbe/runners/pom.xml ---------------------------------------------------------------------- diff --git a/runners/pom.xml b/runners/pom.xml index 4412ed6..4f06748 100644 --- a/runners/pom.xml +++ b/runners/pom.xml @@ -35,6 +35,7 @@ <modules> <module>core-construction-java</module> <module>core-java</module> + <module>local-artifact-service-java</module> <module>direct-java</module> <module>flink</module> <module>google-cloud-dataflow-java</module>