Reverts "Move Google Cloud Dataflow native sinks to worker module"

----Release Notes----

[]
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=115349469


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/51068d16
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/51068d16
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/51068d16

Branch: refs/heads/master
Commit: 51068d1635f4bb3143412a7708852e21816a1d27
Parents: 9f546ef
Author: sgmc <[email protected]>
Authored: Tue Feb 23 09:29:49 2016 -0800
Committer: Davor Bonaci <[email protected]>
Committed: Thu Feb 25 23:58:26 2016 -0800

----------------------------------------------------------------------
 .../dataflow/sdk/runners/worker/AvroSink.java   | 135 +++++++++
 .../dataflow/sdk/runners/worker/TextSink.java   | 291 +++++++++++++++++++
 .../dataflow/sdk/util/common/worker/Sink.java   |  64 ++++
 3 files changed, 490 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51068d16/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/AvroSink.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/AvroSink.java 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/AvroSink.java
new file mode 100644
index 0000000..b101a2b
--- /dev/null
+++ 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/AvroSink.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.sdk.runners.worker;
+
+import com.google.cloud.dataflow.sdk.coders.AvroCoder;
+import com.google.cloud.dataflow.sdk.util.CoderUtils;
+import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
+import com.google.cloud.dataflow.sdk.util.MimeTypes;
+import com.google.cloud.dataflow.sdk.util.ShardingWritableByteChannel;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import 
com.google.cloud.dataflow.sdk.util.WindowedValue.ValueOnlyWindowedValueCoder;
+import com.google.cloud.dataflow.sdk.util.common.worker.Sink;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+
+import java.io.IOException;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.Random;
+
+/**
+ * A sink that writes Avro files.
+ *
+ * @param <T> the type of the elements written to the sink
+ */
+public class AvroSink<T> extends Sink<WindowedValue<T>> {
+
+  final String filenamePrefix;
+  final String shardFormat;
+  final String filenameSuffix;
+  final int shardCount;
+  final AvroCoder<T> avroCoder;
+  final Schema schema;
+
+  public AvroSink(String filename, ValueOnlyWindowedValueCoder<T> coder) {
+    this(filename, "", "", 1, coder);
+  }
+
+  public AvroSink(String filenamePrefix, String shardFormat, String 
filenameSuffix, int shardCount,
+                  ValueOnlyWindowedValueCoder<T> coder) {
+    if (!(coder.getValueCoder() instanceof AvroCoder)) {
+      throw new IllegalArgumentException(String.format(
+          "AvroSink requires an AvroCoder, not a %s", 
coder.getValueCoder().getClass()));
+    }
+
+    this.filenamePrefix = filenamePrefix;
+    this.shardFormat = shardFormat;
+    this.filenameSuffix = filenameSuffix;
+    this.shardCount = shardCount;
+    this.avroCoder = (AvroCoder<T>) coder.getValueCoder();
+    this.schema = this.avroCoder.getSchema();
+  }
+
+  public SinkWriter<WindowedValue<T>> writer(DatumWriter<T> datumWriter) 
throws IOException {
+    WritableByteChannel writer = IOChannelUtils.create(
+        filenamePrefix, shardFormat, filenameSuffix, shardCount, 
MimeTypes.BINARY);
+
+    if (writer instanceof ShardingWritableByteChannel) {
+      return new AvroShardingFileWriter(datumWriter, 
(ShardingWritableByteChannel) writer);
+    } else {
+      return new AvroFileWriter(datumWriter, writer);
+    }
+  }
+
+  @Override
+  public SinkWriter<WindowedValue<T>> writer() throws IOException {
+    return writer(avroCoder.createDatumWriter());
+  }
+
+  /** The SinkWriter for an AvroSink. */
+  class AvroFileWriter implements SinkWriter<WindowedValue<T>> {
+    DataFileWriter<T> fileWriter;
+
+    public AvroFileWriter(DatumWriter<T> datumWriter, WritableByteChannel 
outputChannel)
+        throws IOException {
+      fileWriter = new DataFileWriter<>(datumWriter);
+      fileWriter.create(schema, Channels.newOutputStream(outputChannel));
+    }
+
+    @Override
+    public long add(WindowedValue<T> value) throws IOException {
+      fileWriter.append(value.getValue());
+      // DataFileWriter doesn't support returning the length written. Use the
+      // coder instead.
+      return CoderUtils.encodeToByteArray(avroCoder, value.getValue()).length;
+    }
+
+    @Override
+    public void close() throws IOException {
+      fileWriter.close();
+    }
+  }
+
+  /** The SinkWriter for an AvroSink, which supports sharding. */
+  class AvroShardingFileWriter implements SinkWriter<WindowedValue<T>> {
+    private ArrayList<AvroFileWriter> fileWriters = new ArrayList<>();
+    private final Random random = new Random();
+
+    public AvroShardingFileWriter(
+        DatumWriter<T> datumWriter, ShardingWritableByteChannel outputChannel) 
throws IOException {
+      for (int i = 0; i < outputChannel.getNumShards(); i++) {
+        fileWriters.add(new AvroFileWriter(datumWriter, 
outputChannel.getChannel(i)));
+      }
+    }
+
+    @Override
+    public long add(WindowedValue<T> value) throws IOException {
+      return fileWriters.get(random.nextInt(fileWriters.size())).add(value);
+    }
+
+    @Override
+    public void close() throws IOException {
+      for (AvroFileWriter fileWriter : fileWriters) {
+        fileWriter.close();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51068d16/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/TextSink.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/TextSink.java 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/TextSink.java
new file mode 100644
index 0000000..f48183c
--- /dev/null
+++ 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/TextSink.java
@@ -0,0 +1,291 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.sdk.runners.worker;
+
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.util.CoderUtils;
+import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
+import com.google.cloud.dataflow.sdk.util.MimeTypes;
+import com.google.cloud.dataflow.sdk.util.ShardingWritableByteChannel;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.util.WindowedValue.WindowedValueCoder;
+import com.google.cloud.dataflow.sdk.util.common.worker.Sink;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.Random;
+
+import javax.annotation.Nullable;
+
+/**
+ * A sink that writes text files.
+ *
+ * @param <T> the type of the elements written to the sink
+ */
+public class TextSink<T> extends Sink<T> {
+
+  static final byte[] NEWLINE = getNewline();
+
+  private static byte[] getNewline() {
+    String newline = "\n";
+    try {
+      return newline.getBytes("UTF-8");
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException("UTF-8 not supported", e);
+    }
+  }
+
+  final String namePrefix;
+  final String shardFormat;
+  final String nameSuffix;
+  final int shardCount;
+  final boolean appendTrailingNewlines;
+  final String header;
+  final String footer;
+  final Coder<T> coder;
+
+  /**
+   * For testing only.
+   *
+   * <p>Used by simple tests that write to a single unsharded file.
+   */
+  public static <V> TextSink<WindowedValue<V>> createForTest(
+      String filename,
+      boolean appendTrailingNewlines,
+      @Nullable String header,
+      @Nullable String footer,
+      Coder<V> coder) {
+    return create(filename,
+                  "",
+                  "",
+                  1,
+                  appendTrailingNewlines,
+                  header,
+                  footer,
+                  WindowedValue.getValueOnlyCoder(coder));
+  }
+
+  /**
+   * For DirectPipelineRunner only.
+   * It wraps the coder with {@code WindowedValue.ValueOnlyCoder}.
+   */
+  public static <V> TextSink<WindowedValue<V>> createForDirectPipelineRunner(
+      String filenamePrefix,
+      String shardFormat,
+      String filenameSuffix,
+      int shardCount,
+      boolean appendTrailingNewlines,
+      @Nullable String header,
+      @Nullable String footer,
+      Coder<V> coder) {
+    return create(filenamePrefix,
+                  shardFormat,
+                  filenameSuffix,
+                  shardCount,
+                  appendTrailingNewlines,
+                  header,
+                  footer,
+                  WindowedValue.getValueOnlyCoder(coder));
+  }
+
+  /**
+   * Constructs a new TextSink.
+   *
+   * @param filenamePrefix the prefix of output filenames.
+   * @param shardFormat the shard name template to use for output filenames.
+   * @param filenameSuffix the suffix of output filenames.
+   * @param shardCount the number of outupt shards to produce.
+   * @param appendTrailingNewlines true to append newlines to each output line.
+   * @param header text to place at the beginning of each output file.
+   * @param footer text to place at the end of each output file.
+   * @param coder the code used to encode elements for output.
+   */
+  public static <V> TextSink<V> create(String filenamePrefix,
+                                       String shardFormat,
+                                       String filenameSuffix,
+                                       int shardCount,
+                                       boolean appendTrailingNewlines,
+                                       @Nullable String header,
+                                       @Nullable String footer,
+                                       Coder<V> coder) {
+    return new TextSink<>(filenamePrefix,
+                          shardFormat,
+                          filenameSuffix,
+                          shardCount,
+                          appendTrailingNewlines,
+                          header,
+                          footer,
+                          coder);
+  }
+
+  private TextSink(String filenamePrefix,
+                   String shardFormat,
+                   String filenameSuffix,
+                   int shardCount,
+                   boolean appendTrailingNewlines,
+                   @Nullable String header,
+                   @Nullable String footer,
+                   Coder<T> coder) {
+    this.namePrefix = filenamePrefix;
+    this.shardFormat = shardFormat;
+    this.nameSuffix = filenameSuffix;
+    this.shardCount = shardCount;
+    this.appendTrailingNewlines = appendTrailingNewlines;
+    this.header = header;
+    this.footer = footer;
+    this.coder = coder;
+  }
+
+  @Override
+  public SinkWriter<T> writer() throws IOException {
+    String mimeType;
+
+    if (!(coder instanceof WindowedValueCoder)) {
+      throw new IOException(
+          "Expected WindowedValueCoder for inputCoder, got: "
+          + coder.getClass().getName());
+    }
+    Coder<?> valueCoder = ((WindowedValueCoder<?>) coder).getValueCoder();
+    if (valueCoder.equals(StringUtf8Coder.of())) {
+      mimeType = MimeTypes.TEXT;
+    } else {
+      mimeType = MimeTypes.BINARY;
+    }
+
+    WritableByteChannel writer = IOChannelUtils.create(namePrefix, shardFormat,
+        nameSuffix, shardCount, mimeType);
+
+    if (writer instanceof ShardingWritableByteChannel) {
+      return new ShardingTextFileWriter((ShardingWritableByteChannel) writer);
+    } else {
+      return new TextFileWriter(writer);
+    }
+  }
+
+  /**
+   * Abstract SinkWriter base class shared by sharded and unsharded Text
+   * writer implementations.
+   */
+  abstract class AbstractTextFileWriter implements SinkWriter<T> {
+    protected void init() throws IOException {
+      if (header != null) {
+        printLine(ShardingWritableByteChannel.ALL_SHARDS,
+            CoderUtils.encodeToByteArray(StringUtf8Coder.of(), header));
+      }
+    }
+
+    /**
+     * Adds a value to the sink. Returns the size in bytes of the data written.
+     * The return value does -not- include header/footer size.
+     */
+    @Override
+    public long add(T value) throws IOException {
+      return printLine(getShardNum(value),
+          CoderUtils.encodeToByteArray(coder, value));
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (footer != null) {
+        printLine(ShardingWritableByteChannel.ALL_SHARDS,
+            CoderUtils.encodeToByteArray(StringUtf8Coder.of(), footer));
+      }
+    }
+
+    protected long printLine(int shardNum, byte[] line) throws IOException {
+      long length = line.length;
+      write(shardNum, ByteBuffer.wrap(line));
+
+      if (appendTrailingNewlines) {
+        write(shardNum, ByteBuffer.wrap(NEWLINE));
+        length += NEWLINE.length;
+      }
+
+      return length;
+    }
+
+    protected abstract void write(int shardNum, ByteBuffer buf)
+        throws IOException;
+    protected abstract int getShardNum(T value);
+  }
+
+  /** An unsharded SinkWriter for a TextSink. */
+  class TextFileWriter extends AbstractTextFileWriter {
+    private final WritableByteChannel outputChannel;
+
+    TextFileWriter(WritableByteChannel outputChannel) throws IOException {
+      this.outputChannel = outputChannel;
+      init();
+    }
+
+    @Override
+    public void close() throws IOException {
+      try {
+        super.close();
+      } finally {
+        outputChannel.close();
+      }
+    }
+
+    @Override
+    protected void write(int shardNum, ByteBuffer buf) throws IOException {
+      outputChannel.write(buf);
+    }
+
+    @Override
+    protected int getShardNum(T value) {
+      return 0;
+    }
+  }
+
+  /** A sharding SinkWriter for a TextSink. */
+  class ShardingTextFileWriter extends AbstractTextFileWriter {
+    private final Random rng = new Random();
+    private final int numShards;
+    private final ShardingWritableByteChannel outputChannel;
+
+    // TODO: add support for user-defined sharding function.
+    ShardingTextFileWriter(ShardingWritableByteChannel outputChannel)
+        throws IOException {
+      this.outputChannel = outputChannel;
+      numShards = outputChannel.getNumShards();
+      init();
+    }
+
+    @Override
+    public void close() throws IOException {
+      try {
+        super.close();
+      } finally {
+        outputChannel.close();
+      }
+    }
+
+    @Override
+    protected void write(int shardNum, ByteBuffer buf) throws IOException {
+      outputChannel.writeToShard(shardNum, buf);
+    }
+
+    @Override
+    protected int getShardNum(T value) {
+      return rng.nextInt(numShards);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51068d16/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/Sink.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/Sink.java 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/Sink.java
new file mode 100644
index 0000000..b48d70b
--- /dev/null
+++ 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/Sink.java
@@ -0,0 +1,64 @@
+/*******************************************************************************
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ 
******************************************************************************/
+
+package com.google.cloud.dataflow.sdk.util.common.worker;
+
+import com.google.cloud.dataflow.sdk.util.common.worker.StateSampler.StateKind;
+
+import java.io.IOException;
+
+/**
+ * Abstract base class for Sinks.
+ *
+ * <p>A Sink is written to by getting a SinkWriter and adding values to
+ * it.
+ *
+ * @param <T> the type of the elements written to the sink
+ */
+public abstract class Sink<T> {
+  /**
+   * Returns a Writer that allows writing to this Sink.
+   */
+  public abstract SinkWriter<T> writer() throws IOException;
+
+  /**
+   * Writes to a Sink.
+   */
+  public interface SinkWriter<ElemT> extends AutoCloseable {
+    /**
+     * Adds a value to the sink. Returns the size in bytes of the data written.
+     */
+    public long add(ElemT value) throws IOException;
+
+    @Override
+    public void close() throws IOException;
+  }
+
+  /**
+   * Returns whether this Sink can be restarted.
+   */
+  public boolean supportsRestart() {
+    return false;
+  }
+
+  /**
+   * The default state kind of all the states reported in this Sink.
+   * Defaults to {@link StateKind#USER}.
+   */
+  protected StateKind getStateSamplerStateKind() {
+    return StateKind.USER;
+  }
+}

Reply via email to