Repository: beam Updated Branches: refs/heads/master b49ec3fa2 -> a81c45781
[BEAM-111] Move WritableCoder to hadoop-common Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/44624c38 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/44624c38 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/44624c38 Branch: refs/heads/master Commit: 44624c382ac5ff062191d41df1dcd008839352e0 Parents: b49ec3f Author: Ismaël MejÃa <ieme...@gmail.com> Authored: Sat Feb 25 05:20:58 2017 +0100 Committer: Sela <ans...@paypal.com> Committed: Wed Mar 1 22:47:16 2017 +0200 ---------------------------------------------------------------------- runners/spark/pom.xml | 6 + .../runners/spark/coders/NullWritableCoder.java | 76 ------------ .../runners/spark/coders/WritableCoder.java | 122 ------------------- .../runners/spark/coders/WritableCoderTest.java | 45 ------- .../io/hadoop/HadoopFileFormatPipelineTest.java | 2 +- sdks/java/io/hadoop-common/pom.xml | 10 ++ .../beam/sdk/io/hadoop/WritableCoder.java | 116 ++++++++++++++++++ .../beam/sdk/io/hadoop/WritableCoderTest.java | 45 +++++++ sdks/java/io/hdfs/pom.xml | 5 - .../apache/beam/sdk/io/hdfs/HDFSFileSource.java | 1 + .../apache/beam/sdk/io/hdfs/WritableCoder.java | 116 ------------------ .../beam/sdk/io/hdfs/WritableCoderTest.java | 45 ------- 12 files changed, 179 insertions(+), 410 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/44624c38/runners/spark/pom.xml ---------------------------------------------------------------------- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index 409fc27..8c35178 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -306,6 +306,12 @@ </dependency> <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-hadoop-common</artifactId> + <scope>test</scope> + </dependency> + + <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-all</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/beam/blob/44624c38/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/NullWritableCoder.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/NullWritableCoder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/NullWritableCoder.java deleted file mode 100644 index ebbab1a..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/NullWritableCoder.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.coders; - -import com.fasterxml.jackson.annotation.JsonCreator; -import java.io.InputStream; -import java.io.OutputStream; -import org.apache.beam.sdk.coders.Coder; -import org.apache.hadoop.io.NullWritable; - -/** - * Simple writable coder for Null. - */ -public final class NullWritableCoder extends WritableCoder<NullWritable> { - private static final long serialVersionUID = 1L; - - @JsonCreator - public static NullWritableCoder of() { - return INSTANCE; - } - - private static final NullWritableCoder INSTANCE = new NullWritableCoder(); - - private NullWritableCoder() { - super(NullWritable.class); - } - - @Override - public void encode(NullWritable value, OutputStream outStream, Context context) { - // nothing to write - } - - @Override - public NullWritable decode(InputStream inStream, Context context) { - return NullWritable.get(); - } - - @Override - public boolean consistentWithEquals() { - return true; - } - - /** - * Returns true since registerByteSizeObserver() runs in constant time. - */ - @Override - public boolean isRegisterByteSizeObserverCheap(NullWritable value, Context context) { - return true; - } - - @Override - protected long getEncodedElementByteSize(NullWritable value, Context context) { - return 0; - } - - @Override - public void verifyDeterministic() throws Coder.NonDeterministicException { - // NullWritableCoder is deterministic - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/44624c38/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java deleted file mode 100644 index 40c2627..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.coders; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.lang.reflect.InvocationTargetException; -import java.util.List; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.StandardCoder; -import org.apache.beam.sdk.util.CloudObject; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Writable; - -/** - * A {@code WritableCoder} is a {@link Coder} for a Java class that implements {@link Writable}. - * - * <p>To use, specify the coder type on a PCollection: - * <pre> - * {@code - * PCollection<MyRecord> records = - * foo.apply(...).setCoder(WritableCoder.of(MyRecord.class)); - * } - * </pre> - * - * @param <T> the type of elements handled by this coder - */ -public class WritableCoder<T extends Writable> extends StandardCoder<T> { - private static final long serialVersionUID = 0L; - - /** - * Returns a {@code WritableCoder} instance for the provided element class. - * @param <T> the element type - * @param clazz the element class - * @return a {@code WritableCoder} instance for the provided element class - */ - public static <T extends Writable> WritableCoder<T> of(Class<T> clazz) { - if (clazz.equals(NullWritable.class)) { - @SuppressWarnings("unchecked") - WritableCoder<T> result = (WritableCoder<T>) NullWritableCoder.of(); - return result; - } - return new WritableCoder<>(clazz); - } - - @JsonCreator - @SuppressWarnings("unchecked") - public static WritableCoder<?> of(@JsonProperty("type") String classType) - throws ClassNotFoundException { - Class<?> clazz = Class.forName(classType); - if (!Writable.class.isAssignableFrom(clazz)) { - throw new ClassNotFoundException( - "Class " + classType + " does not implement Writable"); - } - return of((Class<? extends Writable>) clazz); - } - - private final Class<T> type; - - public WritableCoder(Class<T> type) { - this.type = type; - } - - @Override - public void encode(T value, OutputStream outStream, Context context) throws IOException { - value.write(new DataOutputStream(outStream)); - } - - @Override - public T decode(InputStream inStream, Context context) throws IOException { - try { - T t = type.getConstructor().newInstance(); - t.readFields(new DataInputStream(inStream)); - return t; - } catch (NoSuchMethodException | InstantiationException | IllegalAccessException e) { - throw new CoderException("unable to deserialize record", e); - } catch (InvocationTargetException ite) { - throw new CoderException("unable to deserialize record", ite.getCause()); - } - } - - @Override - public List<Coder<?>> getCoderArguments() { - return null; - } - - @Override - protected CloudObject initializeCloudObject() { - CloudObject result = CloudObject.forClass(getClass()); - result.put("type", type.getName()); - return result; - } - - @Override - public void verifyDeterministic() throws Coder.NonDeterministicException { - throw new NonDeterministicException(this, - "Hadoop Writable may be non-deterministic."); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/44624c38/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/WritableCoderTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/WritableCoderTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/WritableCoderTest.java deleted file mode 100644 index 538fd97..0000000 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/WritableCoderTest.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.coders; - -import org.apache.beam.sdk.testing.CoderProperties; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.NullWritable; -import org.junit.Test; - -/** - * Tests for WritableCoder. - */ -public class WritableCoderTest { - - @Test - public void testIntWritableEncoding() throws Exception { - IntWritable value = new IntWritable(42); - WritableCoder<IntWritable> coder = WritableCoder.of(IntWritable.class); - - CoderProperties.coderDecodeEncodeEqual(coder, value); - } - - @Test - public void testNullWritableEncoding() throws Exception { - WritableCoder<NullWritable> coder = WritableCoder.of(NullWritable.class); - - CoderProperties.coderDecodeEncodeEqual(coder, NullWritable.get()); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/44624c38/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java index a5072d6..48b5433 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java @@ -23,9 +23,9 @@ import static org.junit.Assert.assertEquals; import java.io.File; import java.io.IOException; import org.apache.beam.runners.spark.PipelineRule; -import org.apache.beam.runners.spark.coders.WritableCoder; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.io.hadoop.WritableCoder; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.hadoop.conf.Configuration; http://git-wip-us.apache.org/repos/asf/beam/blob/44624c38/sdks/java/io/hadoop-common/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-common/pom.xml b/sdks/java/io/hadoop-common/pom.xml index 13e159c..fcd984f 100644 --- a/sdks/java/io/hadoop-common/pom.xml +++ b/sdks/java/io/hadoop-common/pom.xml @@ -32,6 +32,16 @@ <dependencies> <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-core</artifactId> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + </dependency> + + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <scope>provided</scope> http://git-wip-us.apache.org/repos/asf/beam/blob/44624c38/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java new file mode 100644 index 0000000..0ba367d --- /dev/null +++ b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hadoop; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.StandardCoder; +import org.apache.beam.sdk.util.CloudObject; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; + +/** + * A {@code WritableCoder} is a {@link Coder} for a Java class that implements {@link Writable}. + * + * <p>To use, specify the coder type on a PCollection: + * <pre> + * {@code + * PCollection<MyRecord> records = + * foo.apply(...).setCoder(WritableCoder.of(MyRecord.class)); + * } + * </pre> + * + * @param <T> the type of elements handled by this coder. + */ +public class WritableCoder<T extends Writable> extends StandardCoder<T> { + private static final long serialVersionUID = 0L; + + /** + * Returns a {@code WritableCoder} instance for the provided element class. + * @param <T> the element type + */ + public static <T extends Writable> WritableCoder<T> of(Class<T> clazz) { + return new WritableCoder<>(clazz); + } + + @JsonCreator + @SuppressWarnings("unchecked") + public static WritableCoder<?> of(@JsonProperty("type") String classType) + throws ClassNotFoundException { + Class<?> clazz = Class.forName(classType); + if (!Writable.class.isAssignableFrom(clazz)) { + throw new ClassNotFoundException( + "Class " + classType + " does not implement Writable"); + } + return of((Class<? extends Writable>) clazz); + } + + private final Class<T> type; + + public WritableCoder(Class<T> type) { + this.type = type; + } + + @Override + public void encode(T value, OutputStream outStream, Context context) throws IOException { + value.write(new DataOutputStream(outStream)); + } + + @SuppressWarnings("unchecked") + @Override + public T decode(InputStream inStream, Context context) throws IOException { + try { + if (type == NullWritable.class) { + // NullWritable has no default constructor + return (T) NullWritable.get(); + } + T t = type.newInstance(); + t.readFields(new DataInputStream(inStream)); + return t; + } catch (InstantiationException | IllegalAccessException e) { + throw new CoderException("unable to deserialize record", e); + } + } + + @Override + public List<Coder<?>> getCoderArguments() { + return null; + } + + @Override + public CloudObject initializeCloudObject() { + CloudObject result = CloudObject.forClass(getClass()); + result.put("type", type.getName()); + return result; + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + throw new NonDeterministicException(this, + "Hadoop Writable may be non-deterministic."); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/44624c38/sdks/java/io/hadoop-common/src/test/java/org/apache/beam/sdk/io/hadoop/WritableCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-common/src/test/java/org/apache/beam/sdk/io/hadoop/WritableCoderTest.java b/sdks/java/io/hadoop-common/src/test/java/org/apache/beam/sdk/io/hadoop/WritableCoderTest.java new file mode 100644 index 0000000..8127773 --- /dev/null +++ b/sdks/java/io/hadoop-common/src/test/java/org/apache/beam/sdk/io/hadoop/WritableCoderTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hadoop; + +import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; +import org.junit.Test; + +/** + * Tests for WritableCoder. + */ +public class WritableCoderTest { + + @Test + public void testIntWritableEncoding() throws Exception { + IntWritable value = new IntWritable(42); + WritableCoder<IntWritable> coder = WritableCoder.of(IntWritable.class); + + CoderProperties.coderDecodeEncodeEqual(coder, value); + } + + @Test + public void testNullWritableEncoding() throws Exception { + NullWritable value = NullWritable.get(); + WritableCoder<NullWritable> coder = WritableCoder.of(NullWritable.class); + + CoderProperties.coderDecodeEncodeEqual(coder, value); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/44624c38/sdks/java/io/hdfs/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml index 1212b0e..f3a1a27 100644 --- a/sdks/java/io/hdfs/pom.xml +++ b/sdks/java/io/hdfs/pom.xml @@ -94,11 +94,6 @@ </dependency> <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-annotations</artifactId> - </dependency> - - <dependency> <groupId>com.google.auto.service</groupId> <artifactId>auto-service</artifactId> <optional>true</optional> http://git-wip-us.apache.org/repos/asf/beam/blob/44624c38/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java index 2a731fb..0e3146f 100644 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java @@ -48,6 +48,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.hadoop.SerializableConfiguration; +import org.apache.beam.sdk.io.hadoop.WritableCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.util.CoderUtils; http://git-wip-us.apache.org/repos/asf/beam/blob/44624c38/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java deleted file mode 100644 index d958cda..0000000 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.hdfs; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.List; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.StandardCoder; -import org.apache.beam.sdk.util.CloudObject; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Writable; - -/** - * A {@code WritableCoder} is a {@link Coder} for a Java class that implements {@link Writable}. - * - * <p>To use, specify the coder type on a PCollection: - * <pre> - * {@code - * PCollection<MyRecord> records = - * foo.apply(...).setCoder(WritableCoder.of(MyRecord.class)); - * } - * </pre> - * - * @param <T> the type of elements handled by this coder. - */ -public class WritableCoder<T extends Writable> extends StandardCoder<T> { - private static final long serialVersionUID = 0L; - - /** - * Returns a {@code WritableCoder} instance for the provided element class. - * @param <T> the element type - */ - public static <T extends Writable> WritableCoder<T> of(Class<T> clazz) { - return new WritableCoder<>(clazz); - } - - @JsonCreator - @SuppressWarnings("unchecked") - public static WritableCoder<?> of(@JsonProperty("type") String classType) - throws ClassNotFoundException { - Class<?> clazz = Class.forName(classType); - if (!Writable.class.isAssignableFrom(clazz)) { - throw new ClassNotFoundException( - "Class " + classType + " does not implement Writable"); - } - return of((Class<? extends Writable>) clazz); - } - - private final Class<T> type; - - public WritableCoder(Class<T> type) { - this.type = type; - } - - @Override - public void encode(T value, OutputStream outStream, Context context) throws IOException { - value.write(new DataOutputStream(outStream)); - } - - @SuppressWarnings("unchecked") - @Override - public T decode(InputStream inStream, Context context) throws IOException { - try { - if (type == NullWritable.class) { - // NullWritable has no default constructor - return (T) NullWritable.get(); - } - T t = type.newInstance(); - t.readFields(new DataInputStream(inStream)); - return t; - } catch (InstantiationException | IllegalAccessException e) { - throw new CoderException("unable to deserialize record", e); - } - } - - @Override - public List<Coder<?>> getCoderArguments() { - return null; - } - - @Override - public CloudObject initializeCloudObject() { - CloudObject result = CloudObject.forClass(getClass()); - result.put("type", type.getName()); - return result; - } - - @Override - public void verifyDeterministic() throws NonDeterministicException { - throw new NonDeterministicException(this, - "Hadoop Writable may be non-deterministic."); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/44624c38/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/WritableCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/WritableCoderTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/WritableCoderTest.java deleted file mode 100644 index e78f850..0000000 --- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/WritableCoderTest.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.hdfs; - -import org.apache.beam.sdk.testing.CoderProperties; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.NullWritable; -import org.junit.Test; - -/** - * Tests for WritableCoder. - */ -public class WritableCoderTest { - - @Test - public void testIntWritableEncoding() throws Exception { - IntWritable value = new IntWritable(42); - WritableCoder<IntWritable> coder = WritableCoder.of(IntWritable.class); - - CoderProperties.coderDecodeEncodeEqual(coder, value); - } - - @Test - public void testNullWritableEncoding() throws Exception { - NullWritable value = NullWritable.get(); - WritableCoder<NullWritable> coder = WritableCoder.of(NullWritable.class); - - CoderProperties.coderDecodeEncodeEqual(coder, value); - } -}