[BEAM-2020] Convert all unknown Coders into CustomCoder CloudObjects This ensures that all coders will be serializable, even if there is no registered coder translator.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0b523b68 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0b523b68 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0b523b68 Branch: refs/heads/gearpump-runner Commit: 0b523b685cf5581f68b5318b7fa39550232625fe Parents: 3bd8a0f Author: Thomas Groh <[email protected]> Authored: Tue May 2 10:21:27 2017 -0700 Committer: Luke Cwik <[email protected]> Committed: Tue May 2 13:44:25 2017 -0700 ---------------------------------------------------------------------- .../dataflow/util/CloudObjectTranslators.java | 11 ++++---- .../runners/dataflow/util/CloudObjects.java | 5 +--- ...aultCoderCloudObjectTranslatorRegistrar.java | 2 +- .../runners/dataflow/util/CloudObjectsTest.java | 28 +++++++++++++++++++- 4 files changed, 35 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/0b523b68/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java index 7a95a9e..c27bee7 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java @@ -313,10 +313,11 @@ class CloudObjectTranslators { private static final String CODER_FIELD = "serialized_coder"; private static final String TYPE_FIELD = "type"; - public static CloudObjectTranslator<? extends CustomCoder> custom() { - return new CloudObjectTranslator<CustomCoder>() { + public static CloudObjectTranslator<Coder> javaSerialized() { + return new CloudObjectTranslator<Coder>() { @Override - public CloudObject toCloudObject(CustomCoder target) { + public CloudObject toCloudObject(Coder target) { + // CustomCoder is used as the "marker" for a java-serialized coder CloudObject cloudObject = CloudObject.forClass(CustomCoder.class); Structs.addString(cloudObject, TYPE_FIELD, target.getClass().getName()); Structs.addString( @@ -327,10 +328,10 @@ class CloudObjectTranslators { } @Override - public CustomCoder fromCloudObject(CloudObject cloudObject) { + public Coder fromCloudObject(CloudObject cloudObject) { String serializedCoder = Structs.getString(cloudObject, CODER_FIELD); String type = Structs.getString(cloudObject, TYPE_FIELD); - return (CustomCoder<?>) + return (Coder<?>) SerializableUtils.deserializeFromByteArray( StringUtils.jsonStringToByteArray(serializedCoder), type); } http://git-wip-us.apache.org/repos/asf/beam/blob/0b523b68/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java index a55d10c..9383c48 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java @@ -67,7 +67,7 @@ public class CloudObjects { (CloudObjectTranslator<Coder>) CODER_TRANSLATORS.get(coder.getClass()); if (translator != null) { return translator.toCloudObject(coder); - } else if (coder instanceof CustomCoder) { + } else { CloudObjectTranslator customCoderTranslator = CODER_TRANSLATORS.get(CustomCoder.class); checkNotNull( customCoderTranslator, @@ -77,9 +77,6 @@ public class CloudObjects { DefaultCoderCloudObjectTranslatorRegistrar.class.getSimpleName()); return customCoderTranslator.toCloudObject(coder); } - throw new IllegalArgumentException( - String.format( - "Non-Custom %s with no registered %s", Coder.class, CloudObjectTranslator.class)); } public static Coder<?> coderFromCloudObject(CloudObject cloudObject) { http://git-wip-us.apache.org/repos/asf/beam/blob/0b523b68/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java index 29f047f..5cae13f 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java @@ -67,7 +67,7 @@ public class DefaultCoderCloudObjectTranslatorRegistrar CloudObjectTranslators.windowedValue(), new AvroCoderCloudObjectTranslator(), new SerializableCoderCloudObjectTranslator(), - CloudObjectTranslators.custom()); + CloudObjectTranslators.javaSerialized()); @VisibleForTesting static final ImmutableSet<Class<? extends Coder>> KNOWN_ATOMIC_CODERS = ImmutableSet.<Class<? extends Coder>>of( http://git-wip-us.apache.org/repos/asf/beam/blob/0b523b68/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java index a6a3f25..b670268 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java @@ -29,7 +29,9 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; +import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Set; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.ByteArrayCoder; @@ -40,6 +42,7 @@ import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.LengthPrefixCoder; import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StructuredCoder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; @@ -70,7 +73,7 @@ public class CloudObjectsTest { new DefaultCoderCloudObjectTranslatorRegistrar().classesToTranslators().keySet(); Set<Class<? extends Coder>> testedClasses = new HashSet<>(); for (Coder<?> tested : DefaultCoders.data()) { - if (tested instanceof ObjectCoder) { + if (tested instanceof ObjectCoder || tested instanceof ArbitraryCoder) { testedClasses.add(CustomCoder.class); assertThat(defaultCoderTranslators, hasItem(CustomCoder.class)); } else { @@ -103,6 +106,7 @@ public class CloudObjectsTest { public static Iterable<Coder<?>> data() { Builder<Coder<?>> dataBuilder = ImmutableList.<Coder<?>>builder() + .add(new ArbitraryCoder()) .add(new ObjectCoder()) .add(GlobalWindow.Coder.INSTANCE) .add(IntervalWindow.getCoder()) @@ -161,4 +165,26 @@ public class CloudObjectsTest { return getClass().hashCode(); } } + + /** + * A non-custom coder with no registered translator. + */ + private static class ArbitraryCoder extends StructuredCoder<Record> { + @Override + public void encode(Record value, OutputStream outStream, Context context) + throws CoderException, IOException {} + + @Override + public Record decode(InputStream inStream, Context context) throws CoderException, IOException { + return new Record(); + } + + @Override + public List<? extends Coder<?>> getCoderArguments() { + return Collections.emptyList(); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException {} + } }
