Add WriteFiles translation

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f3ed5a4f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f3ed5a4f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f3ed5a4f

Branch: refs/heads/master
Commit: f3ed5a4f4f05c67371b51e6f8742f554b282eedf
Parents: 0093cf5
Author: Kenneth Knowles <k...@google.com>
Authored: Tue May 30 14:43:25 2017 -0700
Committer: Kenneth Knowles <k...@google.com>
Committed: Thu Jun 1 10:56:59 2017 -0700

----------------------------------------------------------------------
 .../construction/PTransformTranslation.java     |   3 +
 .../construction/WriteFilesTranslation.java     | 152 +++++++++++++++
 .../construction/WriteFilesTranslationTest.java | 186 +++++++++++++++++++
 3 files changed, 341 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f3ed5a4f/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index 00ea55e..99d1e85 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -50,6 +50,9 @@ public class PTransformTranslation {
   public static final String READ_TRANSFORM_URN = "urn:beam:transform:read:v1";
   public static final String WINDOW_TRANSFORM_URN = 
"urn:beam:transform:window:v1";
 
+  // Less well-known. And where shall these live?
+  public static final String WRITE_FILES_TRANSFORM_URN = 
"urn:beam:transform:write_files:0.1";
+
   private static final Map<Class<? extends PTransform>, 
TransformPayloadTranslator>
       KNOWN_PAYLOAD_TRANSLATORS = loadTransformPayloadTranslators();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f3ed5a4f/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
new file mode 100644
index 0000000..99b77ef
--- /dev/null
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import 
org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.WriteFilesPayload;
+import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.WriteFiles;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+
+/**
+ * Utility methods for translating a {@link WriteFiles} to and from {@link 
RunnerApi}
+ * representations.
+ */
+public class WriteFilesTranslation {
+
+  /** The URN for an unknown Java {@link FileBasedSink}. */
+  public static final String CUSTOM_JAVA_FILE_BASED_SINK_URN =
+      "urn:beam:file_based_sink:javasdk:0.1";
+
+  @VisibleForTesting
+  static WriteFilesPayload toProto(WriteFiles<?> transform) {
+    return WriteFilesPayload.newBuilder()
+        .setSink(toProto(transform.getSink()))
+        .setWindowedWrites(transform.isWindowedWrites())
+        .setRunnerDeterminedSharding(
+            transform.getNumShards() == null && transform.getSharding() == 
null)
+        .build();
+  }
+
+  private static SdkFunctionSpec toProto(FileBasedSink<?> sink) {
+    return SdkFunctionSpec.newBuilder()
+        .setSpec(
+            FunctionSpec.newBuilder()
+                .setUrn(CUSTOM_JAVA_FILE_BASED_SINK_URN)
+                .setParameter(
+                    Any.pack(
+                        BytesValue.newBuilder()
+                            .setValue(
+                                
ByteString.copyFrom(SerializableUtils.serializeToByteArray(sink)))
+                            .build())))
+        .build();
+  }
+
+  @VisibleForTesting
+  static FileBasedSink<?> sinkFromProto(SdkFunctionSpec sinkProto) throws 
IOException {
+    checkArgument(
+        sinkProto.getSpec().getUrn().equals(CUSTOM_JAVA_FILE_BASED_SINK_URN),
+        "Cannot extract %s instance from %s with URN %s",
+        FileBasedSink.class.getSimpleName(),
+        FunctionSpec.class.getSimpleName(),
+        sinkProto.getSpec().getUrn());
+
+    byte[] serializedSink =
+        
sinkProto.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray();
+
+    return (FileBasedSink<?>)
+        SerializableUtils.deserializeFromByteArray(
+            serializedSink, FileBasedSink.class.getSimpleName());
+  }
+
+  public static <T> FileBasedSink<T> getSink(
+      AppliedPTransform<PCollection<T>, PDone, ? extends 
PTransform<PCollection<T>, PDone>>
+          transform)
+      throws IOException {
+    return (FileBasedSink<T>) 
sinkFromProto(getWriteFilesPayload(transform).getSink());
+  }
+
+  public static <T> boolean isWindowedWrites(
+      AppliedPTransform<PCollection<T>, PDone, ? extends 
PTransform<PCollection<T>, PDone>>
+          transform)
+      throws IOException {
+    return getWriteFilesPayload(transform).getWindowedWrites();
+  }
+
+  public static <T> boolean isRunnerDeterminedSharding(
+      AppliedPTransform<PCollection<T>, PDone, ? extends 
PTransform<PCollection<T>, PDone>>
+          transform)
+      throws IOException {
+    return getWriteFilesPayload(transform).getRunnerDeterminedSharding();
+  }
+
+  private static <T> WriteFilesPayload getWriteFilesPayload(
+      AppliedPTransform<PCollection<T>, PDone, ? extends 
PTransform<PCollection<T>, PDone>>
+          transform)
+      throws IOException {
+    return PTransformTranslation.toProto(
+            transform, Collections.<AppliedPTransform<?, ?, ?>>emptyList(), 
SdkComponents.create())
+        .getSpec()
+        .getParameter()
+        .unpack(WriteFilesPayload.class);
+  }
+
+  static class WriteFilesTranslator implements 
TransformPayloadTranslator<WriteFiles<?>> {
+    @Override
+    public String getUrn(WriteFiles<?> transform) {
+      return PTransformTranslation.WRITE_FILES_TRANSFORM_URN;
+    }
+
+    @Override
+    public FunctionSpec translate(
+        AppliedPTransform<?, ?, WriteFiles<?>> transform, SdkComponents 
components) {
+      return FunctionSpec.newBuilder()
+          .setUrn(getUrn(transform.getTransform()))
+          .setParameter(Any.pack(toProto(transform.getTransform())))
+          .build();
+    }
+  }
+
+  /** Registers {@link WriteFilesTranslator}. */
+  @AutoService(TransformPayloadTranslatorRegistrar.class)
+  public static class Registrar implements TransformPayloadTranslatorRegistrar 
{
+    @Override
+    public Map<? extends Class<? extends PTransform>, ? extends 
TransformPayloadTranslator>
+        getTransformPayloadTranslators() {
+      return Collections.singletonMap(WriteFiles.class, new 
WriteFilesTranslator());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f3ed5a4f/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
new file mode 100644
index 0000000..739034c
--- /dev/null
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.ParDoPayload;
+import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.WriteFiles;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+import org.junit.runners.Suite;
+
+/** Tests for {@link WriteFilesTranslation}. */
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+  WriteFilesTranslationTest.TestWriteFilesPayloadTranslation.class,
+})
+public class WriteFilesTranslationTest {
+
+  /** Tests for translating various {@link ParDo} transforms to/from {@link 
ParDoPayload} protos. */
+  @RunWith(Parameterized.class)
+  public static class TestWriteFilesPayloadTranslation {
+    @Parameters(name = "{index}: {0}")
+    public static Iterable<WriteFiles<?>> data() {
+      return ImmutableList.<WriteFiles<?>>of(
+          WriteFiles.to(new DummySink()),
+          WriteFiles.to(new DummySink()).withWindowedWrites(),
+          WriteFiles.to(new DummySink()).withNumShards(17),
+          WriteFiles.to(new 
DummySink()).withWindowedWrites().withNumShards(42));
+    }
+
+    @Parameter(0)
+    public WriteFiles<String> writeFiles;
+
+    public static TestPipeline p = 
TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
+    @Test
+    public void testEncodedProto() throws Exception {
+      RunnerApi.WriteFilesPayload payload = 
WriteFilesTranslation.toProto(writeFiles);
+
+      assertThat(
+          payload.getRunnerDeterminedSharding(),
+          equalTo(writeFiles.getNumShards() == null && 
writeFiles.getSharding() == null));
+
+      assertThat(payload.getWindowedWrites(), 
equalTo(writeFiles.isWindowedWrites()));
+
+      assertThat(
+          (FileBasedSink<String>) 
WriteFilesTranslation.sinkFromProto(payload.getSink()),
+          equalTo(writeFiles.getSink()));
+    }
+
+    @Test
+    public void testExtractionDirectFromTransform() throws Exception {
+      PCollection<String> input = p.apply(Create.of("hello"));
+      PDone output = input.apply(writeFiles);
+
+      AppliedPTransform<PCollection<String>, PDone, WriteFiles<String>> 
appliedPTransform =
+          AppliedPTransform.<PCollection<String>, PDone, WriteFiles<String>>of(
+              "foo", input.expand(), output.expand(), writeFiles, p);
+
+      assertThat(
+          WriteFilesTranslation.isRunnerDeterminedSharding(appliedPTransform),
+          equalTo(writeFiles.getNumShards() == null && 
writeFiles.getSharding() == null));
+
+      assertThat(
+          WriteFilesTranslation.isWindowedWrites(appliedPTransform),
+          equalTo(writeFiles.isWindowedWrites()));
+
+      assertThat(WriteFilesTranslation.getSink(appliedPTransform), 
equalTo(writeFiles.getSink()));
+    }
+  }
+
+  /**
+   * A simple {@link FileBasedSink} for testing serialization/deserialization. 
Not mocked to avoid
+   * any issues serializing mocks.
+   */
+  private static class DummySink extends FileBasedSink<String> {
+
+    DummySink() {
+      super(
+          StaticValueProvider.of(FileSystems.matchNewResource("nowhere", 
false)),
+          new DummyFilenamePolicy());
+    }
+
+    @Override
+    public WriteOperation<String> createWriteOperation() {
+      return new DummyWriteOperation(this);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (!(other instanceof DummySink)) {
+        return false;
+      }
+
+      DummySink that = (DummySink) other;
+
+      return getFilenamePolicy().equals(((DummySink) 
other).getFilenamePolicy())
+          && getBaseOutputDirectoryProvider().isAccessible()
+          && that.getBaseOutputDirectoryProvider().isAccessible()
+          && getBaseOutputDirectoryProvider()
+              .get()
+              .equals(that.getBaseOutputDirectoryProvider().get());
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(
+          DummySink.class,
+          getFilenamePolicy(),
+          getBaseOutputDirectoryProvider().isAccessible()
+              ? getBaseOutputDirectoryProvider().get()
+              : null);
+    }
+  }
+
+  private static class DummyWriteOperation extends 
FileBasedSink.WriteOperation<String> {
+    public DummyWriteOperation(FileBasedSink<String> sink) {
+      super(sink);
+    }
+
+    @Override
+    public FileBasedSink.Writer<String> createWriter() throws Exception {
+      throw new UnsupportedOperationException("Should never be called.");
+    }
+  }
+
+  private static class DummyFilenamePolicy extends FilenamePolicy {
+    @Override
+    public ResourceId windowedFilename(
+        ResourceId outputDirectory, WindowedContext c, String extension) {
+      throw new UnsupportedOperationException("Should never be called.");
+    }
+
+    @Nullable
+    @Override
+    public ResourceId unwindowedFilename(ResourceId outputDirectory, Context 
c, String extension) {
+      throw new UnsupportedOperationException("Should never be called.");
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      return other instanceof DummyFilenamePolicy;
+    }
+
+    @Override
+    public int hashCode() {
+      return DummyFilenamePolicy.class.hashCode();
+    }
+  }
+}

Reply via email to