Moves AvroSink to upper level
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0166e199 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0166e199 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0166e199 Branch: refs/heads/master Commit: 0166e19991af956a48ef99310f5f1916225255aa Parents: 2fa3c34 Author: Eugene Kirpichov <[email protected]> Authored: Fri Apr 28 18:05:00 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Mon May 1 18:43:38 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/AvroIO.java | 131 ---------------- .../java/org/apache/beam/sdk/io/AvroSink.java | 150 +++++++++++++++++++ 2 files changed, 150 insertions(+), 131 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/0166e199/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index 2031569..75e14d5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -19,33 +19,24 @@ package org.apache.beam.sdk.io; import static com.google.common.base.Preconditions.checkArgument; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.io.BaseEncoding; -import java.nio.channels.Channels; -import java.nio.channels.WritableByteChannel; import java.util.Map; import java.util.regex.Pattern; import javax.annotation.Nullable; import org.apache.avro.Schema; import org.apache.avro.file.CodecFactory; -import org.apache.avro.file.DataFileWriter; -import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.DatumWriter; import org.apache.avro.reflect.ReflectData; -import org.apache.avro.reflect.ReflectDatumWriter; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.Read.Bounded; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; -import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; @@ -952,126 +943,4 @@ public class AvroIO { /** Disallow construction of utility class. */ private AvroIO() {} - - /** - * A {@link FileBasedSink} for Avro files. - */ - @VisibleForTesting - static class AvroSink<T> extends FileBasedSink<T> { - private final AvroCoder<T> coder; - private final SerializableAvroCodecFactory codec; - private final ImmutableMap<String, Object> metadata; - - @VisibleForTesting - AvroSink( - FilenamePolicy filenamePolicy, - AvroCoder<T> coder, - SerializableAvroCodecFactory codec, - ImmutableMap<String, Object> metadata) { - super(filenamePolicy); - this.coder = coder; - this.codec = codec; - this.metadata = metadata; - } - - @VisibleForTesting - AvroSink( - String baseOutputFilename, - String extension, - String fileNameTemplate, - AvroCoder<T> coder, - SerializableAvroCodecFactory codec, - ImmutableMap<String, Object> metadata) { - super(baseOutputFilename, extension, fileNameTemplate); - this.coder = coder; - this.codec = codec; - this.metadata = metadata; - } - - @Override - public FileBasedSink.FileBasedWriteOperation<T> createWriteOperation() { - return new AvroWriteOperation<>(this, coder, codec, metadata); - } - - /** - * A {@link org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation - * FileBasedWriteOperation} for Avro files. - */ - private static class AvroWriteOperation<T> extends FileBasedWriteOperation<T> { - private final AvroCoder<T> coder; - private final SerializableAvroCodecFactory codec; - private final ImmutableMap<String, Object> metadata; - - private AvroWriteOperation(AvroSink<T> sink, - AvroCoder<T> coder, - SerializableAvroCodecFactory codec, - ImmutableMap<String, Object> metadata) { - super(sink); - this.coder = coder; - this.codec = codec; - this.metadata = metadata; - } - - @Override - public FileBasedWriter<T> createWriter(PipelineOptions options) throws Exception { - return new AvroWriter<>(this, coder, codec, metadata); - } - } - - /** - * A {@link org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter FileBasedWriter} - * for Avro files. - */ - private static class AvroWriter<T> extends FileBasedWriter<T> { - private final AvroCoder<T> coder; - private DataFileWriter<T> dataFileWriter; - private SerializableAvroCodecFactory codec; - private final ImmutableMap<String, Object> metadata; - - public AvroWriter(FileBasedWriteOperation<T> writeOperation, - AvroCoder<T> coder, - SerializableAvroCodecFactory codec, - ImmutableMap<String, Object> metadata) { - super(writeOperation, MimeTypes.BINARY); - this.coder = coder; - this.codec = codec; - this.metadata = metadata; - } - - @SuppressWarnings("deprecation") // uses internal test functionality. - @Override - protected void prepareWrite(WritableByteChannel channel) throws Exception { - DatumWriter<T> datumWriter = coder.getType().equals(GenericRecord.class) - ? new GenericDatumWriter<T>(coder.getSchema()) - : new ReflectDatumWriter<T>(coder.getSchema()); - - dataFileWriter = new DataFileWriter<>(datumWriter).setCodec(codec.getCodec()); - for (Map.Entry<String, Object> entry : metadata.entrySet()) { - Object v = entry.getValue(); - if (v instanceof String) { - dataFileWriter.setMeta(entry.getKey(), (String) v); - } else if (v instanceof Long) { - dataFileWriter.setMeta(entry.getKey(), (Long) v); - } else if (v instanceof byte[]) { - dataFileWriter.setMeta(entry.getKey(), (byte[]) v); - } else { - throw new IllegalStateException( - "Metadata value type must be one of String, Long, or byte[]. Found " - + v.getClass().getSimpleName()); - } - } - dataFileWriter.create(coder.getSchema(), Channels.newOutputStream(channel)); - } - - @Override - public void write(T value) throws Exception { - dataFileWriter.append(value); - } - - @Override - protected void finishWrite() throws Exception { - dataFileWriter.flush(); - } - } - } } http://git-wip-us.apache.org/repos/asf/beam/blob/0166e199/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java new file mode 100644 index 0000000..16f233c --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import com.google.common.collect.ImmutableMap; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.util.Map; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.MimeTypes; + +/** + * A {@link FileBasedSink} for Avro files. + */ +class AvroSink<T> extends FileBasedSink<T> { + private final AvroCoder<T> coder; + private final SerializableAvroCodecFactory codec; + private final ImmutableMap<String, Object> metadata; + + AvroSink( + FilenamePolicy filenamePolicy, + AvroCoder<T> coder, + SerializableAvroCodecFactory codec, + ImmutableMap<String, Object> metadata) { + super(filenamePolicy); + this.coder = coder; + this.codec = codec; + this.metadata = metadata; + } + + AvroSink( + String baseOutputFilename, + String extension, + String fileNameTemplate, + AvroCoder<T> coder, + SerializableAvroCodecFactory codec, + ImmutableMap<String, Object> metadata) { + super(baseOutputFilename, extension, fileNameTemplate); + this.coder = coder; + this.codec = codec; + this.metadata = metadata; + } + + @Override + public FileBasedWriteOperation<T> createWriteOperation() { + return new AvroWriteOperation<>(this, coder, codec, metadata); + } + + /** + * A {@link FileBasedWriteOperation + * FileBasedWriteOperation} for Avro files. + */ + private static class AvroWriteOperation<T> extends FileBasedWriteOperation<T> { + private final AvroCoder<T> coder; + private final SerializableAvroCodecFactory codec; + private final ImmutableMap<String, Object> metadata; + + private AvroWriteOperation(AvroSink<T> sink, + AvroCoder<T> coder, + SerializableAvroCodecFactory codec, + ImmutableMap<String, Object> metadata) { + super(sink); + this.coder = coder; + this.codec = codec; + this.metadata = metadata; + } + + @Override + public FileBasedWriter<T> createWriter(PipelineOptions options) throws Exception { + return new AvroWriter<>(this, coder, codec, metadata); + } + } + + /** + * A {@link FileBasedWriter FileBasedWriter} + * for Avro files. + */ + private static class AvroWriter<T> extends FileBasedWriter<T> { + private final AvroCoder<T> coder; + private DataFileWriter<T> dataFileWriter; + private SerializableAvroCodecFactory codec; + private final ImmutableMap<String, Object> metadata; + + public AvroWriter(FileBasedWriteOperation<T> writeOperation, + AvroCoder<T> coder, + SerializableAvroCodecFactory codec, + ImmutableMap<String, Object> metadata) { + super(writeOperation, MimeTypes.BINARY); + this.coder = coder; + this.codec = codec; + this.metadata = metadata; + } + + @SuppressWarnings("deprecation") // uses internal test functionality. + @Override + protected void prepareWrite(WritableByteChannel channel) throws Exception { + DatumWriter<T> datumWriter = coder.getType().equals(GenericRecord.class) + ? new GenericDatumWriter<T>(coder.getSchema()) + : new ReflectDatumWriter<T>(coder.getSchema()); + + dataFileWriter = new DataFileWriter<>(datumWriter).setCodec(codec.getCodec()); + for (Map.Entry<String, Object> entry : metadata.entrySet()) { + Object v = entry.getValue(); + if (v instanceof String) { + dataFileWriter.setMeta(entry.getKey(), (String) v); + } else if (v instanceof Long) { + dataFileWriter.setMeta(entry.getKey(), (Long) v); + } else if (v instanceof byte[]) { + dataFileWriter.setMeta(entry.getKey(), (byte[]) v); + } else { + throw new IllegalStateException( + "Metadata value type must be one of String, Long, or byte[]. Found " + + v.getClass().getSimpleName()); + } + } + dataFileWriter.create(coder.getSchema(), Channels.newOutputStream(channel)); + } + + @Override + public void write(T value) throws Exception { + dataFileWriter.append(value); + } + + @Override + protected void finishWrite() throws Exception { + dataFileWriter.flush(); + } + } +}
