Move Google Cloud Dataflow native sinks to worker module This is for Apache Beam.
----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=115313244 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9f546efe Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9f546efe Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9f546efe Branch: refs/heads/master Commit: 9f546efebc5c9307dd85e6eec79038285cffa12b Parents: d7b5189 Author: lcwik <[email protected]> Authored: Tue Feb 23 00:37:55 2016 -0800 Committer: Davor Bonaci <[email protected]> Committed: Thu Feb 25 23:58:26 2016 -0800 ---------------------------------------------------------------------- .../dataflow/sdk/runners/worker/AvroSink.java | 135 --------- .../dataflow/sdk/runners/worker/TextSink.java | 291 ------------------- .../dataflow/sdk/util/common/worker/Sink.java | 64 ---- 3 files changed, 490 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9f546efe/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/AvroSink.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/AvroSink.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/AvroSink.java deleted file mode 100644 index b101a2b..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/AvroSink.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.runners.worker; - -import com.google.cloud.dataflow.sdk.coders.AvroCoder; -import com.google.cloud.dataflow.sdk.util.CoderUtils; -import com.google.cloud.dataflow.sdk.util.IOChannelUtils; -import com.google.cloud.dataflow.sdk.util.MimeTypes; -import com.google.cloud.dataflow.sdk.util.ShardingWritableByteChannel; -import com.google.cloud.dataflow.sdk.util.WindowedValue; -import com.google.cloud.dataflow.sdk.util.WindowedValue.ValueOnlyWindowedValueCoder; -import com.google.cloud.dataflow.sdk.util.common.worker.Sink; - -import org.apache.avro.Schema; -import org.apache.avro.file.DataFileWriter; -import org.apache.avro.io.DatumWriter; - -import java.io.IOException; -import java.nio.channels.Channels; -import java.nio.channels.WritableByteChannel; -import java.util.ArrayList; -import java.util.Random; - -/** - * A sink that writes Avro files. - * - * @param <T> the type of the elements written to the sink - */ -public class AvroSink<T> extends Sink<WindowedValue<T>> { - - final String filenamePrefix; - final String shardFormat; - final String filenameSuffix; - final int shardCount; - final AvroCoder<T> avroCoder; - final Schema schema; - - public AvroSink(String filename, ValueOnlyWindowedValueCoder<T> coder) { - this(filename, "", "", 1, coder); - } - - public AvroSink(String filenamePrefix, String shardFormat, String filenameSuffix, int shardCount, - ValueOnlyWindowedValueCoder<T> coder) { - if (!(coder.getValueCoder() instanceof AvroCoder)) { - throw new IllegalArgumentException(String.format( - "AvroSink requires an AvroCoder, not a %s", coder.getValueCoder().getClass())); - } - - this.filenamePrefix = filenamePrefix; - this.shardFormat = shardFormat; - this.filenameSuffix = filenameSuffix; - this.shardCount = shardCount; - this.avroCoder = (AvroCoder<T>) coder.getValueCoder(); - this.schema = this.avroCoder.getSchema(); - } - - public SinkWriter<WindowedValue<T>> writer(DatumWriter<T> datumWriter) throws IOException { - WritableByteChannel writer = IOChannelUtils.create( - filenamePrefix, shardFormat, filenameSuffix, shardCount, MimeTypes.BINARY); - - if (writer instanceof ShardingWritableByteChannel) { - return new AvroShardingFileWriter(datumWriter, (ShardingWritableByteChannel) writer); - } else { - return new AvroFileWriter(datumWriter, writer); - } - } - - @Override - public SinkWriter<WindowedValue<T>> writer() throws IOException { - return writer(avroCoder.createDatumWriter()); - } - - /** The SinkWriter for an AvroSink. */ - class AvroFileWriter implements SinkWriter<WindowedValue<T>> { - DataFileWriter<T> fileWriter; - - public AvroFileWriter(DatumWriter<T> datumWriter, WritableByteChannel outputChannel) - throws IOException { - fileWriter = new DataFileWriter<>(datumWriter); - fileWriter.create(schema, Channels.newOutputStream(outputChannel)); - } - - @Override - public long add(WindowedValue<T> value) throws IOException { - fileWriter.append(value.getValue()); - // DataFileWriter doesn't support returning the length written. Use the - // coder instead. - return CoderUtils.encodeToByteArray(avroCoder, value.getValue()).length; - } - - @Override - public void close() throws IOException { - fileWriter.close(); - } - } - - /** The SinkWriter for an AvroSink, which supports sharding. */ - class AvroShardingFileWriter implements SinkWriter<WindowedValue<T>> { - private ArrayList<AvroFileWriter> fileWriters = new ArrayList<>(); - private final Random random = new Random(); - - public AvroShardingFileWriter( - DatumWriter<T> datumWriter, ShardingWritableByteChannel outputChannel) throws IOException { - for (int i = 0; i < outputChannel.getNumShards(); i++) { - fileWriters.add(new AvroFileWriter(datumWriter, outputChannel.getChannel(i))); - } - } - - @Override - public long add(WindowedValue<T> value) throws IOException { - return fileWriters.get(random.nextInt(fileWriters.size())).add(value); - } - - @Override - public void close() throws IOException { - for (AvroFileWriter fileWriter : fileWriters) { - fileWriter.close(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9f546efe/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/TextSink.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/TextSink.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/TextSink.java deleted file mode 100644 index f48183c..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/TextSink.java +++ /dev/null @@ -1,291 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.runners.worker; - -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.util.CoderUtils; -import com.google.cloud.dataflow.sdk.util.IOChannelUtils; -import com.google.cloud.dataflow.sdk.util.MimeTypes; -import com.google.cloud.dataflow.sdk.util.ShardingWritableByteChannel; -import com.google.cloud.dataflow.sdk.util.WindowedValue; -import com.google.cloud.dataflow.sdk.util.WindowedValue.WindowedValueCoder; -import com.google.cloud.dataflow.sdk.util.common.worker.Sink; - -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.nio.ByteBuffer; -import java.nio.channels.WritableByteChannel; -import java.util.Random; - -import javax.annotation.Nullable; - -/** - * A sink that writes text files. - * - * @param <T> the type of the elements written to the sink - */ -public class TextSink<T> extends Sink<T> { - - static final byte[] NEWLINE = getNewline(); - - private static byte[] getNewline() { - String newline = "\n"; - try { - return newline.getBytes("UTF-8"); - } catch (UnsupportedEncodingException e) { - throw new RuntimeException("UTF-8 not supported", e); - } - } - - final String namePrefix; - final String shardFormat; - final String nameSuffix; - final int shardCount; - final boolean appendTrailingNewlines; - final String header; - final String footer; - final Coder<T> coder; - - /** - * For testing only. - * - * <p>Used by simple tests that write to a single unsharded file. - */ - public static <V> TextSink<WindowedValue<V>> createForTest( - String filename, - boolean appendTrailingNewlines, - @Nullable String header, - @Nullable String footer, - Coder<V> coder) { - return create(filename, - "", - "", - 1, - appendTrailingNewlines, - header, - footer, - WindowedValue.getValueOnlyCoder(coder)); - } - - /** - * For DirectPipelineRunner only. - * It wraps the coder with {@code WindowedValue.ValueOnlyCoder}. - */ - public static <V> TextSink<WindowedValue<V>> createForDirectPipelineRunner( - String filenamePrefix, - String shardFormat, - String filenameSuffix, - int shardCount, - boolean appendTrailingNewlines, - @Nullable String header, - @Nullable String footer, - Coder<V> coder) { - return create(filenamePrefix, - shardFormat, - filenameSuffix, - shardCount, - appendTrailingNewlines, - header, - footer, - WindowedValue.getValueOnlyCoder(coder)); - } - - /** - * Constructs a new TextSink. - * - * @param filenamePrefix the prefix of output filenames. - * @param shardFormat the shard name template to use for output filenames. - * @param filenameSuffix the suffix of output filenames. - * @param shardCount the number of outupt shards to produce. - * @param appendTrailingNewlines true to append newlines to each output line. - * @param header text to place at the beginning of each output file. - * @param footer text to place at the end of each output file. - * @param coder the code used to encode elements for output. - */ - public static <V> TextSink<V> create(String filenamePrefix, - String shardFormat, - String filenameSuffix, - int shardCount, - boolean appendTrailingNewlines, - @Nullable String header, - @Nullable String footer, - Coder<V> coder) { - return new TextSink<>(filenamePrefix, - shardFormat, - filenameSuffix, - shardCount, - appendTrailingNewlines, - header, - footer, - coder); - } - - private TextSink(String filenamePrefix, - String shardFormat, - String filenameSuffix, - int shardCount, - boolean appendTrailingNewlines, - @Nullable String header, - @Nullable String footer, - Coder<T> coder) { - this.namePrefix = filenamePrefix; - this.shardFormat = shardFormat; - this.nameSuffix = filenameSuffix; - this.shardCount = shardCount; - this.appendTrailingNewlines = appendTrailingNewlines; - this.header = header; - this.footer = footer; - this.coder = coder; - } - - @Override - public SinkWriter<T> writer() throws IOException { - String mimeType; - - if (!(coder instanceof WindowedValueCoder)) { - throw new IOException( - "Expected WindowedValueCoder for inputCoder, got: " - + coder.getClass().getName()); - } - Coder<?> valueCoder = ((WindowedValueCoder<?>) coder).getValueCoder(); - if (valueCoder.equals(StringUtf8Coder.of())) { - mimeType = MimeTypes.TEXT; - } else { - mimeType = MimeTypes.BINARY; - } - - WritableByteChannel writer = IOChannelUtils.create(namePrefix, shardFormat, - nameSuffix, shardCount, mimeType); - - if (writer instanceof ShardingWritableByteChannel) { - return new ShardingTextFileWriter((ShardingWritableByteChannel) writer); - } else { - return new TextFileWriter(writer); - } - } - - /** - * Abstract SinkWriter base class shared by sharded and unsharded Text - * writer implementations. - */ - abstract class AbstractTextFileWriter implements SinkWriter<T> { - protected void init() throws IOException { - if (header != null) { - printLine(ShardingWritableByteChannel.ALL_SHARDS, - CoderUtils.encodeToByteArray(StringUtf8Coder.of(), header)); - } - } - - /** - * Adds a value to the sink. Returns the size in bytes of the data written. - * The return value does -not- include header/footer size. - */ - @Override - public long add(T value) throws IOException { - return printLine(getShardNum(value), - CoderUtils.encodeToByteArray(coder, value)); - } - - @Override - public void close() throws IOException { - if (footer != null) { - printLine(ShardingWritableByteChannel.ALL_SHARDS, - CoderUtils.encodeToByteArray(StringUtf8Coder.of(), footer)); - } - } - - protected long printLine(int shardNum, byte[] line) throws IOException { - long length = line.length; - write(shardNum, ByteBuffer.wrap(line)); - - if (appendTrailingNewlines) { - write(shardNum, ByteBuffer.wrap(NEWLINE)); - length += NEWLINE.length; - } - - return length; - } - - protected abstract void write(int shardNum, ByteBuffer buf) - throws IOException; - protected abstract int getShardNum(T value); - } - - /** An unsharded SinkWriter for a TextSink. */ - class TextFileWriter extends AbstractTextFileWriter { - private final WritableByteChannel outputChannel; - - TextFileWriter(WritableByteChannel outputChannel) throws IOException { - this.outputChannel = outputChannel; - init(); - } - - @Override - public void close() throws IOException { - try { - super.close(); - } finally { - outputChannel.close(); - } - } - - @Override - protected void write(int shardNum, ByteBuffer buf) throws IOException { - outputChannel.write(buf); - } - - @Override - protected int getShardNum(T value) { - return 0; - } - } - - /** A sharding SinkWriter for a TextSink. */ - class ShardingTextFileWriter extends AbstractTextFileWriter { - private final Random rng = new Random(); - private final int numShards; - private final ShardingWritableByteChannel outputChannel; - - // TODO: add support for user-defined sharding function. - ShardingTextFileWriter(ShardingWritableByteChannel outputChannel) - throws IOException { - this.outputChannel = outputChannel; - numShards = outputChannel.getNumShards(); - init(); - } - - @Override - public void close() throws IOException { - try { - super.close(); - } finally { - outputChannel.close(); - } - } - - @Override - protected void write(int shardNum, ByteBuffer buf) throws IOException { - outputChannel.writeToShard(shardNum, buf); - } - - @Override - protected int getShardNum(T value) { - return rng.nextInt(numShards); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9f546efe/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/Sink.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/Sink.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/Sink.java deleted file mode 100644 index b48d70b..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/Sink.java +++ /dev/null @@ -1,64 +0,0 @@ -/******************************************************************************* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - ******************************************************************************/ - -package com.google.cloud.dataflow.sdk.util.common.worker; - -import com.google.cloud.dataflow.sdk.util.common.worker.StateSampler.StateKind; - -import java.io.IOException; - -/** - * Abstract base class for Sinks. - * - * <p>A Sink is written to by getting a SinkWriter and adding values to - * it. - * - * @param <T> the type of the elements written to the sink - */ -public abstract class Sink<T> { - /** - * Returns a Writer that allows writing to this Sink. - */ - public abstract SinkWriter<T> writer() throws IOException; - - /** - * Writes to a Sink. - */ - public interface SinkWriter<ElemT> extends AutoCloseable { - /** - * Adds a value to the sink. Returns the size in bytes of the data written. - */ - public long add(ElemT value) throws IOException; - - @Override - public void close() throws IOException; - } - - /** - * Returns whether this Sink can be restarted. - */ - public boolean supportsRestart() { - return false; - } - - /** - * The default state kind of all the states reported in this Sink. - * Defaults to {@link StateKind#USER}. - */ - protected StateKind getStateSamplerStateKind() { - return StateKind.USER; - } -}
