Repository: beam Updated Branches: refs/heads/master 99cf72e7d -> c5cf90c70
[BEAM-1871] Remove deprecated methods from AvroCoder Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a503d349 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a503d349 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a503d349 Branch: refs/heads/master Commit: a503d349d93bd3ef9077c0f4fae713dbb452f5d6 Parents: 99cf72e Author: Luke Cwik <[email protected]> Authored: Wed Apr 26 10:07:15 2017 -0700 Committer: Luke Cwik <[email protected]> Committed: Wed Apr 26 10:32:44 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/coders/AvroCoder.java | 55 +++++--------------- .../java/org/apache/beam/sdk/io/AvroIO.java | 11 ++-- .../java/org/apache/beam/sdk/io/AvroIOTest.java | 14 ++--- .../org/apache/beam/sdk/io/AvroSourceTest.java | 6 ++- 4 files changed, 33 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/a503d349/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java index 91822bf..d60a2ca 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.coders; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Supplier; import java.io.IOException; import java.io.InputStream; @@ -224,17 +223,20 @@ public class AvroCoder<T> extends CustomCoder<T> { private final AvroCoder<T> myCoder = AvroCoder.this; @Override public DatumReader<T> initialValue() { - return myCoder.createDatumReader(); + return myCoder.getType().equals(GenericRecord.class) + ? new GenericDatumReader<T>(myCoder.getSchema()) + : new ReflectDatumReader<T>(myCoder.getSchema()); + } + }; + this.writer = new EmptyOnDeserializationThreadLocal<DatumWriter<T>>() { + private final AvroCoder<T> myCoder = AvroCoder.this; + @Override + public DatumWriter<T> initialValue() { + return myCoder.getType().equals(GenericRecord.class) + ? new GenericDatumWriter<T>(myCoder.getSchema()) + : new ReflectDatumWriter<T>(myCoder.getSchema()); } }; - this.writer = - new EmptyOnDeserializationThreadLocal<DatumWriter<T>>() { - private final AvroCoder<T> myCoder = AvroCoder.this; - @Override - public DatumWriter<T> initialValue() { - return myCoder.createDatumWriter(); - } - }; } /** @@ -314,39 +316,6 @@ public class AvroCoder<T> extends CustomCoder<T> { } /** - * Returns a new {@link DatumReader} that can be used to read from an Avro file directly. Assumes - * the schema used to read is the same as the schema that was used when writing. - * - * @deprecated For {@code AvroCoder} internal use only. - */ - // TODO: once we can remove this deprecated function, inline in constructor. - @Deprecated - @VisibleForTesting - public DatumReader<T> createDatumReader() { - if (type.equals(GenericRecord.class)) { - return new GenericDatumReader<>(schemaSupplier.get()); - } else { - return new ReflectDatumReader<>(schemaSupplier.get()); - } - } - - /** - * Returns a new {@link DatumWriter} that can be used to write to an Avro file directly. - * - * @deprecated For {@code AvroCoder} internal use only. - */ - @Deprecated - @VisibleForTesting - // TODO: once we can remove this deprecated function, inline in constructor. - public DatumWriter<T> createDatumWriter() { - if (type.equals(GenericRecord.class)) { - return new GenericDatumWriter<>(schemaSupplier.get()); - } else { - return new ReflectDatumWriter<>(schemaSupplier.get()); - } - } - - /** * Returns the schema used by this coder. */ public Schema getSchema() { http://git-wip-us.apache.org/repos/asf/beam/blob/a503d349/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index 6a06972..24e158f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -23,18 +23,19 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.io.BaseEncoding; - import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; import java.util.Map; import java.util.regex.Pattern; import javax.annotation.Nullable; - import org.apache.avro.Schema; import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumWriter; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VoidCoder; @@ -1041,7 +1042,11 @@ public class AvroIO { @SuppressWarnings("deprecation") // uses internal test functionality. @Override protected void prepareWrite(WritableByteChannel channel) throws Exception { - dataFileWriter = new DataFileWriter<>(coder.createDatumWriter()).setCodec(codec.getCodec()); + DatumWriter<T> datumWriter = coder.getType().equals(GenericRecord.class) + ? new GenericDatumWriter<T>(coder.getSchema()) + : new ReflectDatumWriter<T>(coder.getSchema()); + + dataFileWriter = new DataFileWriter<>(datumWriter).setCodec(codec.getCodec()); for (Map.Entry<String, Object> entry : metadata.entrySet()) { Object v = entry.getValue(); if (v instanceof String) { http://git-wip-us.apache.org/repos/asf/beam/blob/a503d349/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index a988d85..3e1c4b8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -31,7 +31,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; - import java.io.File; import java.io.FileInputStream; import java.io.IOException; @@ -41,7 +40,6 @@ import java.util.List; import java.util.Objects; import java.util.Random; import java.util.Set; - import org.apache.avro.Schema; import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileReader; @@ -49,6 +47,8 @@ import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.reflect.Nullable; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.io.AvroIO.Write.Bound; @@ -321,7 +321,7 @@ public class AvroIOTest { public TestPipeline windowedAvroWritePipeline = TestPipeline.create(); @Test - @Category({ValidatesRunner.class, UsesTestStream.class }) + @Category({ValidatesRunner.class, UsesTestStream.class}) public void testWindowedAvroIOWrite() throws Throwable { File baseOutputFile = new File(tmpFolder.getRoot(), "prefix"); final String outputFilePrefix = baseOutputFile.getAbsolutePath(); @@ -393,8 +393,9 @@ public class AvroIOTest { for (File outputFile : expectedFiles) { assertTrue("Expected output file " + outputFile.getAbsolutePath(), outputFile.exists()); try (DataFileReader<GenericClass> reader = - new DataFileReader<>(outputFile, AvroCoder.of( - GenericClass.class).createDatumReader())) { + new DataFileReader<>(outputFile, + new ReflectDatumReader<GenericClass>( + ReflectData.get().getSchema(GenericClass.class)))) { Iterators.addAll(actualElements, reader); } outputFile.delete(); @@ -504,7 +505,8 @@ public class AvroIOTest { for (File outputFile : expectedFiles) { assertTrue("Expected output file " + outputFile.getName(), outputFile.exists()); try (DataFileReader<String> reader = - new DataFileReader<>(outputFile, AvroCoder.of(String.class).createDatumReader())) { + new DataFileReader<>(outputFile, + new ReflectDatumReader(ReflectData.get().getSchema(String.class)))) { Iterators.addAll(actualElements, reader); } } http://git-wip-us.apache.org/repos/asf/beam/blob/a503d349/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java index a6b6db5..d6facba 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java @@ -42,11 +42,13 @@ import org.apache.avro.Schema; import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileConstants; import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumWriter; import org.apache.avro.reflect.AvroDefault; import org.apache.avro.reflect.Nullable; import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumWriter; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.io.AvroSource.AvroMetadata; @@ -98,7 +100,9 @@ public class AvroSourceTest { String path = tmpFile.toString(); FileOutputStream os = new FileOutputStream(tmpFile); - DatumWriter<T> datumWriter = coder.createDatumWriter(); + DatumWriter<T> datumWriter = coder.getType().equals(GenericRecord.class) + ? new GenericDatumWriter<T>(coder.getSchema()) + : new ReflectDatumWriter<T>(coder.getSchema()); try (DataFileWriter<T> writer = new DataFileWriter<>(datumWriter)) { writer.setCodec(CodecFactory.fromString(codec)); writer.create(coder.getSchema(), os);
