Reverts "Move Google Cloud Dataflow native sinks to worker module"
----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=115349469 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/51068d16 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/51068d16 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/51068d16 Branch: refs/heads/master Commit: 51068d1635f4bb3143412a7708852e21816a1d27 Parents: 9f546ef Author: sgmc <[email protected]> Authored: Tue Feb 23 09:29:49 2016 -0800 Committer: Davor Bonaci <[email protected]> Committed: Thu Feb 25 23:58:26 2016 -0800 ---------------------------------------------------------------------- .../dataflow/sdk/runners/worker/AvroSink.java | 135 +++++++++ .../dataflow/sdk/runners/worker/TextSink.java | 291 +++++++++++++++++++ .../dataflow/sdk/util/common/worker/Sink.java | 64 ++++ 3 files changed, 490 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51068d16/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/AvroSink.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/AvroSink.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/AvroSink.java new file mode 100644 index 0000000..b101a2b --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/AvroSink.java @@ -0,0 +1,135 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.sdk.runners.worker; + +import com.google.cloud.dataflow.sdk.coders.AvroCoder; +import com.google.cloud.dataflow.sdk.util.CoderUtils; +import com.google.cloud.dataflow.sdk.util.IOChannelUtils; +import com.google.cloud.dataflow.sdk.util.MimeTypes; +import com.google.cloud.dataflow.sdk.util.ShardingWritableByteChannel; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.util.WindowedValue.ValueOnlyWindowedValueCoder; +import com.google.cloud.dataflow.sdk.util.common.worker.Sink; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.io.DatumWriter; + +import java.io.IOException; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.util.ArrayList; +import java.util.Random; + +/** + * A sink that writes Avro files. + * + * @param <T> the type of the elements written to the sink + */ +public class AvroSink<T> extends Sink<WindowedValue<T>> { + + final String filenamePrefix; + final String shardFormat; + final String filenameSuffix; + final int shardCount; + final AvroCoder<T> avroCoder; + final Schema schema; + + public AvroSink(String filename, ValueOnlyWindowedValueCoder<T> coder) { + this(filename, "", "", 1, coder); + } + + public AvroSink(String filenamePrefix, String shardFormat, String filenameSuffix, int shardCount, + ValueOnlyWindowedValueCoder<T> coder) { + if (!(coder.getValueCoder() instanceof AvroCoder)) { + throw new IllegalArgumentException(String.format( + "AvroSink requires an AvroCoder, not a %s", coder.getValueCoder().getClass())); + } + + this.filenamePrefix = filenamePrefix; + this.shardFormat = shardFormat; + this.filenameSuffix = filenameSuffix; + this.shardCount = shardCount; + this.avroCoder = (AvroCoder<T>) coder.getValueCoder(); + this.schema = this.avroCoder.getSchema(); + } + + public SinkWriter<WindowedValue<T>> writer(DatumWriter<T> datumWriter) throws IOException { + WritableByteChannel writer = IOChannelUtils.create( + filenamePrefix, shardFormat, filenameSuffix, shardCount, MimeTypes.BINARY); + + if (writer instanceof ShardingWritableByteChannel) { + return new AvroShardingFileWriter(datumWriter, (ShardingWritableByteChannel) writer); + } else { + return new AvroFileWriter(datumWriter, writer); + } + } + + @Override + public SinkWriter<WindowedValue<T>> writer() throws IOException { + return writer(avroCoder.createDatumWriter()); + } + + /** The SinkWriter for an AvroSink. */ + class AvroFileWriter implements SinkWriter<WindowedValue<T>> { + DataFileWriter<T> fileWriter; + + public AvroFileWriter(DatumWriter<T> datumWriter, WritableByteChannel outputChannel) + throws IOException { + fileWriter = new DataFileWriter<>(datumWriter); + fileWriter.create(schema, Channels.newOutputStream(outputChannel)); + } + + @Override + public long add(WindowedValue<T> value) throws IOException { + fileWriter.append(value.getValue()); + // DataFileWriter doesn't support returning the length written. Use the + // coder instead. + return CoderUtils.encodeToByteArray(avroCoder, value.getValue()).length; + } + + @Override + public void close() throws IOException { + fileWriter.close(); + } + } + + /** The SinkWriter for an AvroSink, which supports sharding. */ + class AvroShardingFileWriter implements SinkWriter<WindowedValue<T>> { + private ArrayList<AvroFileWriter> fileWriters = new ArrayList<>(); + private final Random random = new Random(); + + public AvroShardingFileWriter( + DatumWriter<T> datumWriter, ShardingWritableByteChannel outputChannel) throws IOException { + for (int i = 0; i < outputChannel.getNumShards(); i++) { + fileWriters.add(new AvroFileWriter(datumWriter, outputChannel.getChannel(i))); + } + } + + @Override + public long add(WindowedValue<T> value) throws IOException { + return fileWriters.get(random.nextInt(fileWriters.size())).add(value); + } + + @Override + public void close() throws IOException { + for (AvroFileWriter fileWriter : fileWriters) { + fileWriter.close(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51068d16/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/TextSink.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/TextSink.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/TextSink.java new file mode 100644 index 0000000..f48183c --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/TextSink.java @@ -0,0 +1,291 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.sdk.runners.worker; + +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; +import com.google.cloud.dataflow.sdk.util.CoderUtils; +import com.google.cloud.dataflow.sdk.util.IOChannelUtils; +import com.google.cloud.dataflow.sdk.util.MimeTypes; +import com.google.cloud.dataflow.sdk.util.ShardingWritableByteChannel; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.util.WindowedValue.WindowedValueCoder; +import com.google.cloud.dataflow.sdk.util.common.worker.Sink; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.util.Random; + +import javax.annotation.Nullable; + +/** + * A sink that writes text files. + * + * @param <T> the type of the elements written to the sink + */ +public class TextSink<T> extends Sink<T> { + + static final byte[] NEWLINE = getNewline(); + + private static byte[] getNewline() { + String newline = "\n"; + try { + return newline.getBytes("UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException("UTF-8 not supported", e); + } + } + + final String namePrefix; + final String shardFormat; + final String nameSuffix; + final int shardCount; + final boolean appendTrailingNewlines; + final String header; + final String footer; + final Coder<T> coder; + + /** + * For testing only. + * + * <p>Used by simple tests that write to a single unsharded file. + */ + public static <V> TextSink<WindowedValue<V>> createForTest( + String filename, + boolean appendTrailingNewlines, + @Nullable String header, + @Nullable String footer, + Coder<V> coder) { + return create(filename, + "", + "", + 1, + appendTrailingNewlines, + header, + footer, + WindowedValue.getValueOnlyCoder(coder)); + } + + /** + * For DirectPipelineRunner only. + * It wraps the coder with {@code WindowedValue.ValueOnlyCoder}. + */ + public static <V> TextSink<WindowedValue<V>> createForDirectPipelineRunner( + String filenamePrefix, + String shardFormat, + String filenameSuffix, + int shardCount, + boolean appendTrailingNewlines, + @Nullable String header, + @Nullable String footer, + Coder<V> coder) { + return create(filenamePrefix, + shardFormat, + filenameSuffix, + shardCount, + appendTrailingNewlines, + header, + footer, + WindowedValue.getValueOnlyCoder(coder)); + } + + /** + * Constructs a new TextSink. + * + * @param filenamePrefix the prefix of output filenames. + * @param shardFormat the shard name template to use for output filenames. + * @param filenameSuffix the suffix of output filenames. + * @param shardCount the number of outupt shards to produce. + * @param appendTrailingNewlines true to append newlines to each output line. + * @param header text to place at the beginning of each output file. + * @param footer text to place at the end of each output file. + * @param coder the code used to encode elements for output. + */ + public static <V> TextSink<V> create(String filenamePrefix, + String shardFormat, + String filenameSuffix, + int shardCount, + boolean appendTrailingNewlines, + @Nullable String header, + @Nullable String footer, + Coder<V> coder) { + return new TextSink<>(filenamePrefix, + shardFormat, + filenameSuffix, + shardCount, + appendTrailingNewlines, + header, + footer, + coder); + } + + private TextSink(String filenamePrefix, + String shardFormat, + String filenameSuffix, + int shardCount, + boolean appendTrailingNewlines, + @Nullable String header, + @Nullable String footer, + Coder<T> coder) { + this.namePrefix = filenamePrefix; + this.shardFormat = shardFormat; + this.nameSuffix = filenameSuffix; + this.shardCount = shardCount; + this.appendTrailingNewlines = appendTrailingNewlines; + this.header = header; + this.footer = footer; + this.coder = coder; + } + + @Override + public SinkWriter<T> writer() throws IOException { + String mimeType; + + if (!(coder instanceof WindowedValueCoder)) { + throw new IOException( + "Expected WindowedValueCoder for inputCoder, got: " + + coder.getClass().getName()); + } + Coder<?> valueCoder = ((WindowedValueCoder<?>) coder).getValueCoder(); + if (valueCoder.equals(StringUtf8Coder.of())) { + mimeType = MimeTypes.TEXT; + } else { + mimeType = MimeTypes.BINARY; + } + + WritableByteChannel writer = IOChannelUtils.create(namePrefix, shardFormat, + nameSuffix, shardCount, mimeType); + + if (writer instanceof ShardingWritableByteChannel) { + return new ShardingTextFileWriter((ShardingWritableByteChannel) writer); + } else { + return new TextFileWriter(writer); + } + } + + /** + * Abstract SinkWriter base class shared by sharded and unsharded Text + * writer implementations. + */ + abstract class AbstractTextFileWriter implements SinkWriter<T> { + protected void init() throws IOException { + if (header != null) { + printLine(ShardingWritableByteChannel.ALL_SHARDS, + CoderUtils.encodeToByteArray(StringUtf8Coder.of(), header)); + } + } + + /** + * Adds a value to the sink. Returns the size in bytes of the data written. + * The return value does -not- include header/footer size. + */ + @Override + public long add(T value) throws IOException { + return printLine(getShardNum(value), + CoderUtils.encodeToByteArray(coder, value)); + } + + @Override + public void close() throws IOException { + if (footer != null) { + printLine(ShardingWritableByteChannel.ALL_SHARDS, + CoderUtils.encodeToByteArray(StringUtf8Coder.of(), footer)); + } + } + + protected long printLine(int shardNum, byte[] line) throws IOException { + long length = line.length; + write(shardNum, ByteBuffer.wrap(line)); + + if (appendTrailingNewlines) { + write(shardNum, ByteBuffer.wrap(NEWLINE)); + length += NEWLINE.length; + } + + return length; + } + + protected abstract void write(int shardNum, ByteBuffer buf) + throws IOException; + protected abstract int getShardNum(T value); + } + + /** An unsharded SinkWriter for a TextSink. */ + class TextFileWriter extends AbstractTextFileWriter { + private final WritableByteChannel outputChannel; + + TextFileWriter(WritableByteChannel outputChannel) throws IOException { + this.outputChannel = outputChannel; + init(); + } + + @Override + public void close() throws IOException { + try { + super.close(); + } finally { + outputChannel.close(); + } + } + + @Override + protected void write(int shardNum, ByteBuffer buf) throws IOException { + outputChannel.write(buf); + } + + @Override + protected int getShardNum(T value) { + return 0; + } + } + + /** A sharding SinkWriter for a TextSink. */ + class ShardingTextFileWriter extends AbstractTextFileWriter { + private final Random rng = new Random(); + private final int numShards; + private final ShardingWritableByteChannel outputChannel; + + // TODO: add support for user-defined sharding function. + ShardingTextFileWriter(ShardingWritableByteChannel outputChannel) + throws IOException { + this.outputChannel = outputChannel; + numShards = outputChannel.getNumShards(); + init(); + } + + @Override + public void close() throws IOException { + try { + super.close(); + } finally { + outputChannel.close(); + } + } + + @Override + protected void write(int shardNum, ByteBuffer buf) throws IOException { + outputChannel.writeToShard(shardNum, buf); + } + + @Override + protected int getShardNum(T value) { + return rng.nextInt(numShards); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51068d16/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/Sink.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/Sink.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/Sink.java new file mode 100644 index 0000000..b48d70b --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/Sink.java @@ -0,0 +1,64 @@ +/******************************************************************************* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + ******************************************************************************/ + +package com.google.cloud.dataflow.sdk.util.common.worker; + +import com.google.cloud.dataflow.sdk.util.common.worker.StateSampler.StateKind; + +import java.io.IOException; + +/** + * Abstract base class for Sinks. + * + * <p>A Sink is written to by getting a SinkWriter and adding values to + * it. + * + * @param <T> the type of the elements written to the sink + */ +public abstract class Sink<T> { + /** + * Returns a Writer that allows writing to this Sink. + */ + public abstract SinkWriter<T> writer() throws IOException; + + /** + * Writes to a Sink. + */ + public interface SinkWriter<ElemT> extends AutoCloseable { + /** + * Adds a value to the sink. Returns the size in bytes of the data written. + */ + public long add(ElemT value) throws IOException; + + @Override + public void close() throws IOException; + } + + /** + * Returns whether this Sink can be restarted. + */ + public boolean supportsRestart() { + return false; + } + + /** + * The default state kind of all the states reported in this Sink. + * Defaults to {@link StateKind#USER}. + */ + protected StateKind getStateSamplerStateKind() { + return StateKind.USER; + } +}
