Repository: beam Updated Branches: refs/heads/master cd813fba0 -> 4867c995f
Add CloudObjectTranslators for Avro, Serializable This means these coders can be sent to Dataflow with a stable serialization. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2ce97566 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2ce97566 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2ce97566 Branch: refs/heads/master Commit: 2ce97566be76dacd9d529bde090e1f66f9ae819b Parents: cd813fb Author: Thomas Groh <[email protected]> Authored: Wed Apr 26 20:05:09 2017 -0700 Committer: Luke Cwik <[email protected]> Committed: Mon May 1 16:45:17 2017 -0700 ---------------------------------------------------------------------- runners/google-cloud-dataflow-java/pom.xml | 5 ++ .../util/AvroCoderCloudObjectTranslator.java | 62 +++++++++++++++++++ ...aultCoderCloudObjectTranslatorRegistrar.java | 2 + .../SerializableCoderCloudObjectTranslator.java | 65 ++++++++++++++++++++ .../runners/dataflow/util/CloudObjectsTest.java | 13 ++-- .../sdk/extensions/protobuf/ProtoCoder.java | 4 ++ 6 files changed, 146 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/2ce97566/runners/google-cloud-dataflow-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index cb0fa7f..e75abbd 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -289,6 +289,11 @@ </dependency> <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </dependency> + + <dependency> <!-- Note: when relocating guava, ensure guava-testlib is not also relocated by excluding com.google.common.**.testing.* --> <groupId>com.google.guava</groupId> http://git-wip-us.apache.org/repos/asf/beam/blob/2ce97566/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java new file mode 100644 index 0000000..444849d --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.dataflow.util; + +import org.apache.avro.Schema; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.util.CloudObject; +import org.apache.beam.sdk.util.Structs; + +/** A {@link CloudObjectTranslator} for {@link AvroCoder}. */ +class AvroCoderCloudObjectTranslator implements CloudObjectTranslator<AvroCoder> { + private static final String TYPE_FIELD = "type"; + private static final String SCHEMA_FIELD = "schema"; + + @Override + public CloudObject toCloudObject(AvroCoder target) { + CloudObject base = CloudObject.forClass(AvroCoder.class); + Structs.addString(base, SCHEMA_FIELD, target.getSchema().toString()); + Structs.addString(base, TYPE_FIELD, target.getType().getName()); + return base; + } + + @Override + public AvroCoder<?> fromCloudObject(CloudObject cloudObject) { + Schema.Parser parser = new Schema.Parser(); + String className = Structs.getString(cloudObject, TYPE_FIELD); + String schemaString = Structs.getString(cloudObject, SCHEMA_FIELD); + try { + Class<?> type = Class.forName(className); + Schema schema = parser.parse(schemaString); + return AvroCoder.of(type, schema); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException(e); + } + } + + @Override + public Class<? extends AvroCoder> getSupportedClass() { + return AvroCoder.class; + } + + @Override + public String cloudObjectClassName() { + return CloudObject.forClass(AvroCoder.class).getClassName(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/2ce97566/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java index 3b9fa95..29f047f 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java @@ -65,6 +65,8 @@ public class DefaultCoderCloudObjectTranslatorRegistrar CloudObjectTranslators.stream(), CloudObjectTranslators.pair(), CloudObjectTranslators.windowedValue(), + new AvroCoderCloudObjectTranslator(), + new SerializableCoderCloudObjectTranslator(), CloudObjectTranslators.custom()); @VisibleForTesting static final ImmutableSet<Class<? extends Coder>> KNOWN_ATOMIC_CODERS = http://git-wip-us.apache.org/repos/asf/beam/blob/2ce97566/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java new file mode 100644 index 0000000..67c021c --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.dataflow.util; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.io.Serializable; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.util.CloudObject; +import org.apache.beam.sdk.util.Structs; + +/** A {@link CloudObjectTranslator} for {@link SerializableCoder}. */ +class SerializableCoderCloudObjectTranslator implements CloudObjectTranslator<SerializableCoder> { + private static final String TYPE_FIELD = "type"; + + @Override + public CloudObject toCloudObject(SerializableCoder target) { + CloudObject base = CloudObject.forClass(SerializableCoder.class); + Structs.addString(base, TYPE_FIELD, target.getRecordType().getName()); + return base; + } + + @Override + public SerializableCoder<?> fromCloudObject(CloudObject cloudObject) { + String className = Structs.getString(cloudObject, TYPE_FIELD); + try { + Class<? extends Serializable> targetClass = + (Class<? extends Serializable>) Class.forName(className); + checkArgument( + Serializable.class.isAssignableFrom(targetClass), + "Target class %s does not extend %s", + targetClass.getName(), + Serializable.class.getSimpleName()); + return SerializableCoder.of(targetClass); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException(e); + } + } + + @Override + public Class<SerializableCoder> getSupportedClass() { + return SerializableCoder.class; + } + + @Override + public String cloudObjectClassName() { + return CloudObject.forClass(SerializableCoder.class).getClassName(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/2ce97566/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java index fdea285..a6a3f25 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java @@ -28,8 +28,10 @@ import com.google.common.collect.ImmutableList.Builder; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.io.Serializable; import java.util.HashSet; import java.util.Set; +import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; @@ -37,12 +39,12 @@ import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.LengthPrefixCoder; +import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.util.CloudObject; import org.apache.beam.sdk.util.InstanceBuilder; -import org.apache.beam.sdk.util.Serializer; import org.apache.beam.sdk.util.WindowedValue; import org.junit.Test; import org.junit.experimental.runners.Enclosed; @@ -112,7 +114,9 @@ public class CloudObjectsTest { KvCoder.of(VarLongCoder.of(), ByteArrayCoder.of()), IntervalWindow.getCoder())) .add(VarLongCoder.of()) - .add(ByteArrayCoder.of()); + .add(ByteArrayCoder.of()) + .add(SerializableCoder.of(Record.class)) + .add(AvroCoder.of(Record.class)); for (Class<? extends Coder> atomicCoder : DefaultCoderCloudObjectTranslatorRegistrar.KNOWN_ATOMIC_CODERS) { dataBuilder.add(InstanceBuilder.ofType(atomicCoder).fromFactoryMethod("of").build()); @@ -126,16 +130,15 @@ public class CloudObjectsTest { @Test public void toAndFromCloudObject() throws Exception { CloudObject cloudObject = CloudObjects.asCloudObject(coder); - Coder<?> reconstructed = Serializer.deserialize(cloudObject, Coder.class); Coder<?> fromCloudObject = CloudObjects.coderFromCloudObject(cloudObject); - assertEquals(coder.getClass(), reconstructed.getClass()); assertEquals(coder.getClass(), fromCloudObject.getClass()); - assertEquals(coder, reconstructed); assertEquals(coder, fromCloudObject); } } + private static class Record implements Serializable {} + private static class ObjectCoder extends CustomCoder<Object> { @Override public void encode(Object value, OutputStream outStream, Context context) http://git-wip-us.apache.org/repos/asf/beam/blob/2ce97566/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java index 9ec7aec..8e90a5f 100644 --- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java +++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java @@ -229,6 +229,10 @@ public class ProtoCoder<T extends Message> extends CustomCoder<T> { return protoMessageClass; } + public Set<Class<?>> getExtensionHosts() { + return extensionHostClasses; + } + /** * Returns the {@link ExtensionRegistry} listing all known Protocol Buffers extension messages * to {@code T} registered with this {@link ProtoCoder}.
