This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new ab5b0ae [BEAM-6067] Specify pipeline_coder_id property in non-Beam-standard CloudObject coders. (#7045) ab5b0ae is described below commit ab5b0aea03bc29779654758d6acd4f8e8b6d86ca Author: CraigChambersG <45049052+craigchambe...@users.noreply.github.com> AuthorDate: Wed Nov 14 19:25:16 2018 -0800 [BEAM-6067] Specify pipeline_coder_id property in non-Beam-standard CloudObject coders. (#7045) --- .../core/construction/ModelCoderRegistrar.java | 4 ++ .../dataflow/DataflowPipelineTranslator.java | 10 ++- .../util/AvroCoderCloudObjectTranslator.java | 3 +- .../dataflow/util/CloudObjectTranslator.java | 4 +- .../dataflow/util/CloudObjectTranslators.java | 80 ++++++++++++++-------- .../beam/runners/dataflow/util/CloudObjects.java | 19 ++++- .../beam/runners/dataflow/util/PropertyNames.java | 1 + .../SerializableCoderCloudObjectTranslator.java | 3 +- .../runners/dataflow/util/CloudObjectsTest.java | 38 +++++++++- ...HarnessCoderCloudObjectTranslatorRegistrar.java | 5 +- .../worker/graph/LengthPrefixUnknownCoders.java | 5 +- .../dataflow/worker/util/TimerOrElement.java | 3 +- .../dataflow/worker/AvroByteReaderFactoryTest.java | 6 +- .../dataflow/worker/ConcatReaderFactoryTest.java | 3 +- .../dataflow/worker/InMemoryReaderFactoryTest.java | 2 +- .../IntrinsicMapTaskExecutorFactoryTest.java | 11 +-- .../dataflow/worker/IsmSideInputReaderTest.java | 5 +- .../runners/dataflow/worker/ReaderFactoryTest.java | 5 +- .../dataflow/worker/ShuffleReaderFactoryTest.java | 8 ++- .../runners/dataflow/worker/SinkRegistryTest.java | 2 +- .../worker/StreamingDataflowWorkerTest.java | 46 +++++++++---- ...eamingPCollectionViewWriterDoFnFactoryTest.java | 3 +- .../graph/LengthPrefixUnknownCodersTest.java | 64 ++++++++++------- .../dataflow/worker/util/CloudSourceUtilsTest.java | 2 +- .../dataflow/worker/util/TimerOrElementTest.java | 3 +- 25 files changed, 234 insertions(+), 101 deletions(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java index e07a478..cbb22af 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java @@ -86,6 +86,10 @@ public class ModelCoderRegistrar implements CoderTranslatorRegistrar { Coder.class.getSimpleName()); } + public static boolean isKnownCoder(Coder<?> coder) { + return BEAM_MODEL_CODER_URNS.containsKey(coder.getClass()); + } + @Override public Map<Class<? extends Coder>, String> getCoderURNs() { return BEAM_MODEL_CODER_URNS; diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index f5190df..a99bd30 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -568,7 +568,7 @@ public class DataflowPipelineTranslator { @Override public void addEncodingInput(Coder<?> coder) { - CloudObject encoding = CloudObjects.asCloudObject(coder); + CloudObject encoding = translateCoder(coder, translator); addObject(getProperties(), PropertyNames.ENCODING, encoding); } @@ -668,7 +668,7 @@ public class DataflowPipelineTranslator { if (valueCoder != null) { // Verify that encoding can be decoded, in order to catch serialization // failures as early as possible. - CloudObject encoding = CloudObjects.asCloudObject(valueCoder); + CloudObject encoding = translateCoder(valueCoder, translator); addObject(outputInfo, PropertyNames.ENCODING, encoding); translator.outputCoders.put(value, valueCoder); } @@ -1016,7 +1016,7 @@ public class DataflowPipelineTranslator { stepContext.addInput( PropertyNames.RESTRICTION_CODER, - CloudObjects.asCloudObject(transform.getRestrictionCoder())); + translateCoder(transform.getRestrictionCoder(), context)); } }); } @@ -1098,4 +1098,8 @@ public class DataflowPipelineTranslator { stepContext.addOutput(tag.getId(), (PCollection<?>) taggedOutput.getValue()); } } + + private static CloudObject translateCoder(Coder<?> coder, TranslationContext context) { + return CloudObjects.asCloudObject(coder, context.isFnApi() ? context.getSdkComponents() : null); + } } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java index 9a4047d..a4ce533 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.dataflow.util; import org.apache.avro.Schema; +import org.apache.beam.runners.core.construction.SdkComponents; import org.apache.beam.sdk.coders.AvroCoder; /** A {@link CloudObjectTranslator} for {@link AvroCoder}. */ @@ -26,7 +27,7 @@ class AvroCoderCloudObjectTranslator implements CloudObjectTranslator<AvroCoder> private static final String SCHEMA_FIELD = "schema"; @Override - public CloudObject toCloudObject(AvroCoder target) { + public CloudObject toCloudObject(AvroCoder target, SdkComponents sdkComponents) { CloudObject base = CloudObject.forClass(AvroCoder.class); Structs.addString(base, SCHEMA_FIELD, target.getSchema().toString()); Structs.addString(base, TYPE_FIELD, target.getType().getName()); diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java index aa4af84..3638eaf 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java @@ -17,13 +17,15 @@ */ package org.apache.beam.runners.dataflow.util; +import org.apache.beam.runners.core.construction.SdkComponents; + /** * A translator that takes an object and creates a {@link CloudObject} which can be converted back * to the original object. */ public interface CloudObjectTranslator<T> { /** Converts the provided object into an equivalent {@link CloudObject}. */ - CloudObject toCloudObject(T target); + CloudObject toCloudObject(T target, SdkComponents sdkComponents); /** Converts back into the original object from a provided {@link CloudObject}. */ T fromCloudObject(CloudObject cloudObject); diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java index 1d1966d..b6f7388 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import org.apache.beam.runners.core.construction.SdkComponents; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CustomCoder; @@ -50,11 +51,12 @@ import org.apache.beam.sdk.values.TupleTag; class CloudObjectTranslators { private CloudObjectTranslators() {} - private static CloudObject addComponents(CloudObject base, List<? extends Coder<?>> components) { + private static CloudObject addComponents( + CloudObject base, List<? extends Coder<?>> components, SdkComponents sdkComponents) { if (!components.isEmpty()) { List<CloudObject> cloudComponents = new ArrayList<>(components.size()); for (Coder component : components) { - cloudComponents.add(CloudObjects.asCloudObject(component)); + cloudComponents.add(CloudObjects.asCloudObject(component, sdkComponents)); } Structs.addList(base, PropertyNames.COMPONENT_ENCODINGS, cloudComponents); } @@ -79,11 +81,13 @@ class CloudObjectTranslators { public static CloudObjectTranslator<KvCoder> pair() { return new CloudObjectTranslator<KvCoder>() { @Override - public CloudObject toCloudObject(KvCoder target) { + public CloudObject toCloudObject(KvCoder target, SdkComponents sdkComponents) { CloudObject result = CloudObject.forClassName(CloudObjectKinds.KIND_PAIR); Structs.addBoolean(result, PropertyNames.IS_PAIR_LIKE, true); return addComponents( - result, ImmutableList.<Coder<?>>of(target.getKeyCoder(), target.getValueCoder())); + result, + ImmutableList.<Coder<?>>of(target.getKeyCoder(), target.getValueCoder()), + sdkComponents); } @Override @@ -112,10 +116,11 @@ class CloudObjectTranslators { public static CloudObjectTranslator<IterableCoder> stream() { return new CloudObjectTranslator<IterableCoder>() { @Override - public CloudObject toCloudObject(IterableCoder target) { + public CloudObject toCloudObject(IterableCoder target, SdkComponents sdkComponents) { CloudObject result = CloudObject.forClassName(CloudObjectKinds.KIND_STREAM); Structs.addBoolean(result, PropertyNames.IS_STREAM_LIKE, true); - return addComponents(result, Collections.<Coder<?>>singletonList(target.getElemCoder())); + return addComponents( + result, Collections.<Coder<?>>singletonList(target.getElemCoder()), sdkComponents); } @Override @@ -144,10 +149,11 @@ class CloudObjectTranslators { static CloudObjectTranslator<LengthPrefixCoder> lengthPrefix() { return new CloudObjectTranslator<LengthPrefixCoder>() { @Override - public CloudObject toCloudObject(LengthPrefixCoder target) { + public CloudObject toCloudObject(LengthPrefixCoder target, SdkComponents sdkComponents) { return addComponents( CloudObject.forClassName(CloudObjectKinds.KIND_LENGTH_PREFIX), - Collections.<Coder<?>>singletonList(target.getValueCoder())); + Collections.<Coder<?>>singletonList(target.getValueCoder()), + sdkComponents); } @Override @@ -176,9 +182,11 @@ class CloudObjectTranslators { static CloudObjectTranslator<GlobalWindow.Coder> globalWindow() { return new CloudObjectTranslator<GlobalWindow.Coder>() { @Override - public CloudObject toCloudObject(GlobalWindow.Coder target) { + public CloudObject toCloudObject(GlobalWindow.Coder target, SdkComponents sdkComponents) { return addComponents( - CloudObject.forClassName(CloudObjectKinds.KIND_GLOBAL_WINDOW), Collections.emptyList()); + CloudObject.forClassName(CloudObjectKinds.KIND_GLOBAL_WINDOW), + Collections.emptyList(), + sdkComponents); } @Override @@ -205,10 +213,11 @@ class CloudObjectTranslators { static CloudObjectTranslator<IntervalWindowCoder> intervalWindow() { return new CloudObjectTranslator<IntervalWindowCoder>() { @Override - public CloudObject toCloudObject(IntervalWindowCoder target) { + public CloudObject toCloudObject(IntervalWindowCoder target, SdkComponents sdkComponents) { return addComponents( CloudObject.forClassName(CloudObjectKinds.KIND_INTERVAL_WINDOW), - Collections.emptyList()); + Collections.emptyList(), + sdkComponents); } @Override @@ -235,11 +244,13 @@ class CloudObjectTranslators { static CloudObjectTranslator<FullWindowedValueCoder> windowedValue() { return new CloudObjectTranslator<FullWindowedValueCoder>() { @Override - public CloudObject toCloudObject(FullWindowedValueCoder target) { + public CloudObject toCloudObject(FullWindowedValueCoder target, SdkComponents sdkComponents) { CloudObject result = CloudObject.forClassName(CloudObjectKinds.KIND_WINDOWED_VALUE); Structs.addBoolean(result, PropertyNames.IS_WRAPPER, true); return addComponents( - result, ImmutableList.<Coder<?>>of(target.getValueCoder(), target.getWindowCoder())); + result, + ImmutableList.<Coder<?>>of(target.getValueCoder(), target.getWindowCoder()), + sdkComponents); } @Override @@ -270,9 +281,11 @@ class CloudObjectTranslators { static CloudObjectTranslator<ByteArrayCoder> bytes() { return new CloudObjectTranslator<ByteArrayCoder>() { @Override - public CloudObject toCloudObject(ByteArrayCoder target) { + public CloudObject toCloudObject(ByteArrayCoder target, SdkComponents sdkComponents) { return addComponents( - CloudObject.forClassName(CloudObjectKinds.KIND_BYTES), Collections.emptyList()); + CloudObject.forClassName(CloudObjectKinds.KIND_BYTES), + Collections.emptyList(), + sdkComponents); } @Override @@ -299,8 +312,9 @@ class CloudObjectTranslators { static CloudObjectTranslator<VarLongCoder> varInt() { return new CloudObjectTranslator<VarLongCoder>() { @Override - public CloudObject toCloudObject(VarLongCoder target) { - return addComponents(CloudObject.forClass(target.getClass()), Collections.emptyList()); + public CloudObject toCloudObject(VarLongCoder target, SdkComponents sdkComponents) { + return addComponents( + CloudObject.forClass(target.getClass()), Collections.emptyList(), sdkComponents); } @Override @@ -326,7 +340,7 @@ class CloudObjectTranslators { public static CloudObjectTranslator<Coder> javaSerialized() { return new CloudObjectTranslator<Coder>() { @Override - public CloudObject toCloudObject(Coder target) { + public CloudObject toCloudObject(Coder target, SdkComponents sdkComponents) { // CustomCoder is used as the "marker" for a java-serialized coder CloudObject cloudObject = CloudObject.forClass(CustomCoder.class); Structs.addString(cloudObject, TYPE_FIELD, target.getClass().getName()); @@ -363,7 +377,7 @@ class CloudObjectTranslators { InstanceBuilder.ofType(coderClass).fromFactoryMethod("of").build(); return new CloudObjectTranslator<T>() { @Override - public CloudObject toCloudObject(T target) { + public CloudObject toCloudObject(T target, SdkComponents sdkComponents) { return CloudObject.forClass(coderClass); } @@ -388,9 +402,10 @@ class CloudObjectTranslators { final Class<? extends IterableLikeCoder> clazz) { return new CloudObjectTranslator<IterableLikeCoder>() { @Override - public CloudObject toCloudObject(IterableLikeCoder target) { + public CloudObject toCloudObject(IterableLikeCoder target, SdkComponents sdkComponents) { CloudObject base = CloudObject.forClass(clazz); - return addComponents(base, Collections.<Coder<?>>singletonList(target.getElemCoder())); + return addComponents( + base, Collections.<Coder<?>>singletonList(target.getElemCoder()), sdkComponents); } @Override @@ -422,10 +437,12 @@ class CloudObjectTranslators { public static CloudObjectTranslator<MapCoder> map() { return new CloudObjectTranslator<MapCoder>() { @Override - public CloudObject toCloudObject(MapCoder target) { + public CloudObject toCloudObject(MapCoder target, SdkComponents sdkComponents) { CloudObject base = CloudObject.forClass(MapCoder.class); return addComponents( - base, ImmutableList.<Coder<?>>of(target.getKeyCoder(), target.getValueCoder())); + base, + ImmutableList.<Coder<?>>of(target.getKeyCoder(), target.getValueCoder()), + sdkComponents); } @Override @@ -454,9 +471,10 @@ class CloudObjectTranslators { public static CloudObjectTranslator<NullableCoder> nullable() { return new CloudObjectTranslator<NullableCoder>() { @Override - public CloudObject toCloudObject(NullableCoder target) { + public CloudObject toCloudObject(NullableCoder target, SdkComponents sdkComponents) { CloudObject base = CloudObject.forClass(NullableCoder.class); - return addComponents(base, Collections.<Coder<?>>singletonList(target.getValueCoder())); + return addComponents( + base, Collections.<Coder<?>>singletonList(target.getValueCoder()), sdkComponents); } @Override @@ -485,8 +503,9 @@ class CloudObjectTranslators { public static CloudObjectTranslator<UnionCoder> union() { return new CloudObjectTranslator<UnionCoder>() { @Override - public CloudObject toCloudObject(UnionCoder target) { - return addComponents(CloudObject.forClass(UnionCoder.class), target.getElementCoders()); + public CloudObject toCloudObject(UnionCoder target, SdkComponents sdkComponents) { + return addComponents( + CloudObject.forClass(UnionCoder.class), target.getElementCoders(), sdkComponents); } @Override @@ -510,11 +529,12 @@ class CloudObjectTranslators { public static CloudObjectTranslator<CoGbkResultCoder> coGroupByKeyResult() { return new CloudObjectTranslator<CoGbkResultCoder>() { @Override - public CloudObject toCloudObject(CoGbkResultCoder target) { + public CloudObject toCloudObject(CoGbkResultCoder target, SdkComponents sdkComponents) { CloudObject base = CloudObject.forClass(CoGbkResultCoder.class); Structs.addObject( base, PropertyNames.CO_GBK_RESULT_SCHEMA, toCloudObject(target.getSchema())); - return addComponents(base, Collections.singletonList(target.getUnionCoder())); + return addComponents( + base, Collections.singletonList(target.getUnionCoder()), sdkComponents); } private CloudObject toCloudObject(CoGbkResultSchema schema) { diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java index 4b026df..a96d3dd 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java @@ -23,6 +23,9 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.collect.ImmutableMap; import java.util.Map; import java.util.ServiceLoader; +import javax.annotation.Nullable; +import org.apache.beam.runners.core.construction.ModelCoderRegistrar; +import org.apache.beam.runners.core.construction.SdkComponents; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CustomCoder; @@ -58,11 +61,12 @@ public class CloudObjects { } /** Convert the provided {@link Coder} into a {@link CloudObject}. */ - public static CloudObject asCloudObject(Coder<?> coder) { + public static CloudObject asCloudObject(Coder<?> coder, @Nullable SdkComponents sdkComponents) { CloudObjectTranslator<Coder> translator = (CloudObjectTranslator<Coder>) CODER_TRANSLATORS.get(coder.getClass()); + CloudObject encoding; if (translator != null) { - return translator.toCloudObject(coder); + encoding = translator.toCloudObject(coder, sdkComponents); } else { CloudObjectTranslator customCoderTranslator = CODER_TRANSLATORS.get(CustomCoder.class); checkNotNull( @@ -71,8 +75,17 @@ public class CloudObjects { CloudObjectTranslator.class.getSimpleName(), CustomCoder.class.getSimpleName(), DefaultCoderCloudObjectTranslatorRegistrar.class.getSimpleName()); - return customCoderTranslator.toCloudObject(coder); + encoding = customCoderTranslator.toCloudObject(coder, sdkComponents); } + if (sdkComponents != null && !ModelCoderRegistrar.isKnownCoder(coder)) { + try { + String coderId = sdkComponents.registerCoder(coder); + Structs.addString(encoding, PropertyNames.PIPELINE_PROTO_CODER_ID, coderId); + } catch (Exception e) { + throw new RuntimeException("Unable to register coder " + coder, e); + } + } + return encoding; } public static Coder<?> coderFromCloudObject(CloudObject cloudObject) { diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java index bd510c8..e644e0f 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java @@ -64,4 +64,5 @@ public class PropertyNames { public static final String DISPLAY_DATA = "display_data"; public static final String RESTRICTION_CODER = "restriction_coder"; public static final String IMPULSE_ELEMENT = "impulse_element"; + public static final String PIPELINE_PROTO_CODER_ID = "pipeline_proto_coder_id"; } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java index 8cac585..b1be7a3 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java @@ -20,6 +20,7 @@ package org.apache.beam.runners.dataflow.util; import static com.google.common.base.Preconditions.checkArgument; import java.io.Serializable; +import org.apache.beam.runners.core.construction.SdkComponents; import org.apache.beam.sdk.coders.SerializableCoder; /** A {@link CloudObjectTranslator} for {@link SerializableCoder}. */ @@ -27,7 +28,7 @@ class SerializableCoderCloudObjectTranslator implements CloudObjectTranslator<Se private static final String TYPE_FIELD = "type"; @Override - public CloudObject toCloudObject(SerializableCoder target) { + public CloudObject toCloudObject(SerializableCoder target, SdkComponents sdkComponents) { CloudObject base = CloudObject.forClass(SerializableCoder.class); Structs.addString(base, TYPE_FIELD, target.getRecordType().getName()); return base; diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java index 4d75ecd..b7e8cc8 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java @@ -20,7 +20,9 @@ package org.apache.beam.runners.dataflow.util; import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList.Builder; @@ -32,6 +34,8 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import org.apache.beam.runners.core.construction.ModelCoderRegistrar; +import org.apache.beam.runners.core.construction.SdkComponents; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; @@ -149,12 +153,44 @@ public class CloudObjectsTest { @Test public void toAndFromCloudObject() throws Exception { - CloudObject cloudObject = CloudObjects.asCloudObject(coder); + CloudObject cloudObject = CloudObjects.asCloudObject(coder, /*sdkComponents=*/ null); Coder<?> fromCloudObject = CloudObjects.coderFromCloudObject(cloudObject); assertEquals(coder.getClass(), fromCloudObject.getClass()); assertEquals(coder, fromCloudObject); } + + @Test + public void toAndFromCloudObjectWithSdkComponents() throws Exception { + SdkComponents sdkComponents = SdkComponents.create(); + CloudObject cloudObject = CloudObjects.asCloudObject(coder, sdkComponents); + Coder<?> fromCloudObject = CloudObjects.coderFromCloudObject(cloudObject); + + assertEquals(coder.getClass(), fromCloudObject.getClass()); + assertEquals(coder, fromCloudObject); + + checkPipelineProtoCoderIds(coder, cloudObject, sdkComponents); + } + + private static void checkPipelineProtoCoderIds( + Coder<?> coder, CloudObject cloudObject, SdkComponents sdkComponents) throws Exception { + if (ModelCoderRegistrar.isKnownCoder(coder)) { + assertFalse(cloudObject.containsKey(PropertyNames.PIPELINE_PROTO_CODER_ID)); + } else { + assertTrue(cloudObject.containsKey(PropertyNames.PIPELINE_PROTO_CODER_ID)); + assertEquals( + sdkComponents.registerCoder(coder), + cloudObject.get(PropertyNames.PIPELINE_PROTO_CODER_ID)); + } + List<? extends Coder<?>> coderArguments = coder.getCoderArguments(); + Object cloudComponentsObject = cloudObject.get(PropertyNames.COMPONENT_ENCODINGS); + assertTrue(cloudComponentsObject instanceof List); + List<CloudObject> cloudComponents = (List<CloudObject>) cloudComponentsObject; + assertEquals(coderArguments.size(), cloudComponents.size()); + for (int i = 0; i < coderArguments.size(); i++) { + checkPipelineProtoCoderIds(coderArguments.get(i), cloudComponents.get(i), sdkComponents); + } + } } private static class Record implements Serializable {} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/RunnerHarnessCoderCloudObjectTranslatorRegistrar.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/RunnerHarnessCoderCloudObjectTranslatorRegistrar.java index 2b2bfd5..de242d1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/RunnerHarnessCoderCloudObjectTranslatorRegistrar.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/RunnerHarnessCoderCloudObjectTranslatorRegistrar.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import org.apache.beam.runners.core.construction.SdkComponents; import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecordCoder; import org.apache.beam.runners.dataflow.util.CloudObject; import org.apache.beam.runners.dataflow.util.CloudObjectTranslator; @@ -69,7 +70,7 @@ public class RunnerHarnessCoderCloudObjectTranslatorRegistrar implements CloudObjectTranslator<IsmRecordCoder<?>> { @Override - public CloudObject toCloudObject(IsmRecordCoder<?> target) { + public CloudObject toCloudObject(IsmRecordCoder<?> target, SdkComponents sdkComponents) { throw new UnsupportedOperationException(); } @@ -99,7 +100,7 @@ public class RunnerHarnessCoderCloudObjectTranslatorRegistrar InstanceBuilder.ofType(coderClass).fromFactoryMethod("of").build(); return new CloudObjectTranslator<T>() { @Override - public CloudObject toCloudObject(T target) { + public CloudObject toCloudObject(T target, SdkComponents sdkComponents) { throw new UnsupportedOperationException(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java index 091ab79..398037e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java @@ -229,7 +229,8 @@ public class LengthPrefixUnknownCoders { // Handle well known coders. if (LENGTH_PREFIX_CODER_TYPE.equals(coderType)) { if (replaceWithByteArrayCoder) { - return CloudObjects.asCloudObject(LENGTH_PREFIXED_BYTE_ARRAY_CODER); + return CloudObjects.asCloudObject( + LENGTH_PREFIXED_BYTE_ARRAY_CODER, /*sdkComponents=*/ null); } return codec; } else if (WELL_KNOWN_CODER_TYPES.contains(coderType)) { @@ -251,7 +252,7 @@ public class LengthPrefixUnknownCoders { // Wrap unknown coders with length prefix coder. if (replaceWithByteArrayCoder) { - return CloudObjects.asCloudObject(LENGTH_PREFIXED_BYTE_ARRAY_CODER); + return CloudObjects.asCloudObject(LENGTH_PREFIXED_BYTE_ARRAY_CODER, /*sdkComponents=*/ null); } else { Map<String, Object> prefixedCodec = new HashMap<>(); prefixedCodec.put(PropertyNames.OBJECT_TYPE_NAME, LENGTH_PREFIX_CODER_TYPE); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/TimerOrElement.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/TimerOrElement.java index b0dbb26..9db549e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/TimerOrElement.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/TimerOrElement.java @@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableMap; import java.util.Collections; import java.util.List; import java.util.Map; +import org.apache.beam.runners.core.construction.SdkComponents; import org.apache.beam.runners.dataflow.util.CloudObject; import org.apache.beam.runners.dataflow.util.CloudObjectTranslator; import org.apache.beam.runners.dataflow.util.CloudObjects; @@ -70,7 +71,7 @@ public class TimerOrElement { private static class TimerOrElementCloudObjectTranslator implements CloudObjectTranslator<TimerOrElementCoder> { @Override - public CloudObject toCloudObject(TimerOrElementCoder target) { + public CloudObject toCloudObject(TimerOrElementCoder target, SdkComponents sdkComponents) { throw new IllegalArgumentException("Should never be called"); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/AvroByteReaderFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/AvroByteReaderFactoryTest.java index 68761fc..6c16ae6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/AvroByteReaderFactoryTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/AvroByteReaderFactoryTest.java @@ -73,7 +73,8 @@ public class AvroByteReaderFactoryTest { Coder<?> coder = WindowedValue.getFullCoder(BigEndianIntegerCoder.of(), GlobalWindow.Coder.INSTANCE); NativeReader<?> reader = - runTestCreateAvroReader(pathToAvroFile, null, null, CloudObjects.asCloudObject(coder)); + runTestCreateAvroReader( + pathToAvroFile, null, null, CloudObjects.asCloudObject(coder, /*sdkComponents=*/ null)); Assert.assertThat(reader, new IsInstanceOf(AvroByteReader.class)); AvroByteReader avroReader = (AvroByteReader) reader; @@ -88,7 +89,8 @@ public class AvroByteReaderFactoryTest { Coder<?> coder = WindowedValue.getFullCoder(BigEndianIntegerCoder.of(), GlobalWindow.Coder.INSTANCE); NativeReader<?> reader = - runTestCreateAvroReader(pathToAvroFile, 200L, 500L, CloudObjects.asCloudObject(coder)); + runTestCreateAvroReader( + pathToAvroFile, 200L, 500L, CloudObjects.asCloudObject(coder, /*sdkComponents=*/ null)); Assert.assertThat(reader, new IsInstanceOf(AvroByteReader.class)); AvroByteReader avroReader = (AvroByteReader) reader; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ConcatReaderFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ConcatReaderFactoryTest.java index a12947c..772500f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ConcatReaderFactoryTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ConcatReaderFactoryTest.java @@ -58,7 +58,8 @@ public class ConcatReaderFactoryTest { inMemorySourceDictionary.put(PropertyNames.SOURCE_SPEC, inMemorySourceSpec); - CloudObject textSourceEncoding = CloudObjects.asCloudObject(StringUtf8Coder.of()); + CloudObject textSourceEncoding = + CloudObjects.asCloudObject(StringUtf8Coder.of(), /*sdkComponents=*/ null); inMemorySourceDictionary.put(PropertyNames.ENCODING, textSourceEncoding); sourcesList.add(inMemorySourceDictionary); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/InMemoryReaderFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/InMemoryReaderFactoryTest.java index 9d36ac0..cdb3b2b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/InMemoryReaderFactoryTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/InMemoryReaderFactoryTest.java @@ -56,7 +56,7 @@ public class InMemoryReaderFactoryTest { Source cloudSource = new Source(); cloudSource.setSpec(spec); - cloudSource.setCodec(CloudObjects.asCloudObject(coder)); + cloudSource.setCodec(CloudObjects.asCloudObject(coder, /*sdkComponents=*/ null)); return cloudSource; } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java index 0cfe3f4..972c19a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java @@ -120,7 +120,8 @@ public class IntrinsicMapTaskExecutorFactoryTest { .andThen(new MapTaskToNetworkFunction()); private static final CloudObject windowedStringCoder = - CloudObjects.asCloudObject(WindowedValue.getValueOnlyCoder(StringUtf8Coder.of())); + CloudObjects.asCloudObject( + WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()), /*sdkComponents=*/ null); private IntrinsicMapTaskExecutorFactory mapTaskExecutorFactory; private PipelineOptions options; @@ -560,13 +561,15 @@ public class IntrinsicMapTaskExecutorFactoryTest { CloudObjects.asCloudObject( FullWindowedValueCoder.of( KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()), - IntervalWindowCoder.of()))); + IntervalWindowCoder.of()), + /*sdkComponents=*/ null)); InstructionOutput output = new InstructionOutput(); output.setName("pgbk_output_name"); output.setCodec( CloudObjects.asCloudObject( - KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(BigEndianIntegerCoder.of())))); + KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(BigEndianIntegerCoder.of())), + /*sdkComponents=*/ null)); output.setOriginalName("originalName"); output.setSystemName("systemName"); @@ -691,7 +694,7 @@ public class IntrinsicMapTaskExecutorFactoryTest { InstructionOutput output = new InstructionOutput(); output.setName("flatten_output_name"); - output.setCodec(CloudObjects.asCloudObject(StringUtf8Coder.of())); + output.setCodec(CloudObjects.asCloudObject(StringUtf8Coder.of(), /*sdkComponents=*/ null)); output.setOriginalName("originalName"); output.setSystemName("systemName"); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReaderTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReaderTest.java index a07e970..3edf08e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReaderTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReaderTest.java @@ -1495,7 +1495,7 @@ public class IsmSideInputReaderTest { private <T> void verifyList(List<T> expected, List<T> actual) { assertEquals(expected.size(), actual.size()); - List<Integer> iterationOrder = new ArrayList<>(); + List<Integer> iterationOrder = new ArrayList<Integer>(); Random random = new Random(1892389023490L); for (int i = 0; i < expected.size(); ++i) { iterationOrder.add(i); @@ -1770,7 +1770,8 @@ public class IsmSideInputReaderTest { private <K, V> Source newIsmSource(IsmRecordCoder<WindowedValue<V>> coder, String tmpFilePath) { Source source = new Source(); source.setCodec( - CloudObjects.asCloudObject(WindowedValue.getFullCoder(coder, GLOBAL_WINDOW_CODER))); + CloudObjects.asCloudObject( + WindowedValue.getFullCoder(coder, GLOBAL_WINDOW_CODER), /*sdkComponents=*/ null)); source.setSpec(new HashMap<String, Object>()); source.getSpec().put(PropertyNames.OBJECT_TYPE_NAME, "IsmSource"); source.getSpec().put(WorkerPropertyNames.FILENAME, tmpFilePath); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ReaderFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ReaderFactoryTest.java index 80d85a8..34bc386 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ReaderFactoryTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ReaderFactoryTest.java @@ -121,7 +121,8 @@ public class ReaderFactoryTest { Source cloudSource = new Source(); cloudSource.setSpec(spec); - cloudSource.setCodec(CloudObjects.asCloudObject(BigEndianIntegerCoder.of())); + cloudSource.setCodec( + CloudObjects.asCloudObject(BigEndianIntegerCoder.of(), /*sdkComponents=*/ null)); PipelineOptions options = PipelineOptionsFactory.create(); ReaderRegistry registry = @@ -141,7 +142,7 @@ public class ReaderFactoryTest { CloudObject spec = CloudObject.forClassName("UnknownSource"); Source cloudSource = new Source(); cloudSource.setSpec(spec); - cloudSource.setCodec(CloudObjects.asCloudObject(StringUtf8Coder.of())); + cloudSource.setCodec(CloudObjects.asCloudObject(StringUtf8Coder.of(), /*sdkComponents=*/ null)); try { PipelineOptions options = PipelineOptionsFactory.create(); ReaderRegistry.defaultRegistry() diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ShuffleReaderFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ShuffleReaderFactoryTest.java index 45022d2..c12cd9e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ShuffleReaderFactoryTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ShuffleReaderFactoryTest.java @@ -87,7 +87,7 @@ public class ShuffleReaderFactoryTest { shuffleReaderConfig, start, end, - CloudObjects.asCloudObject(coder), + CloudObjects.asCloudObject(coder, /*sdkComponents=*/ null), BatchModeExecutionContext.forTesting(PipelineOptionsFactory.create(), "testStage"), UngroupedShuffleReader.class, "UngroupedShuffleSource"); @@ -114,7 +114,8 @@ public class ShuffleReaderFactoryTest { end, CloudObjects.asCloudObject( FullWindowedValueCoder.of( - KvCoder.of(keyCoder, IterableCoder.of(valueCoder)), IntervalWindowCoder.of())), + KvCoder.of(keyCoder, IterableCoder.of(valueCoder)), IntervalWindowCoder.of()), + /*sdkComponents=*/ null), context, GroupingShuffleReader.class, "GroupingShuffleSource"); @@ -142,7 +143,8 @@ public class ShuffleReaderFactoryTest { CloudObjects.asCloudObject( FullWindowedValueCoder.of( KvCoder.of(keyCoder, windowedValueCoder.getValueCoder()), - IntervalWindowCoder.of())), + IntervalWindowCoder.of()), + /*sdkComponents=*/ null), BatchModeExecutionContext.forTesting(PipelineOptionsFactory.create(), "testStage"), PartitioningShuffleReader.class, "PartitioningShuffleSource"); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SinkRegistryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SinkRegistryTest.java index 099dae4..89c1a69 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SinkRegistryTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SinkRegistryTest.java @@ -59,7 +59,7 @@ public class SinkRegistryTest { com.google.api.services.dataflow.model.Sink cloudSink = new com.google.api.services.dataflow.model.Sink(); cloudSink.setSpec(spec); - cloudSink.setCodec(CloudObjects.asCloudObject(StringUtf8Coder.of())); + cloudSink.setCodec(CloudObjects.asCloudObject(StringUtf8Coder.of(), /*sdkComponents=*/ null)); try { SinkRegistry.defaultRegistry() .create( diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index b87ee45..d69e304 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -230,7 +230,8 @@ public class StreamingDataflowWorkerTest { CloudObject timerCloudObject = CloudObject.forClassName( "com.google.cloud.dataflow.sdk.util.TimerOrElement$TimerOrElementCoder"); - List<CloudObject> component = Collections.singletonList(CloudObjects.asCloudObject(coder)); + List<CloudObject> component = + Collections.singletonList(CloudObjects.asCloudObject(coder, /*sdkComponents=*/ null)); Structs.addList(timerCloudObject, PropertyNames.COMPONENT_ENCODINGS, component); CloudObject encodedCoder = CloudObject.forClassName("kind:windowed_value"); @@ -238,7 +239,9 @@ public class StreamingDataflowWorkerTest { Structs.addList( encodedCoder, PropertyNames.COMPONENT_ENCODINGS, - ImmutableList.of(timerCloudObject, CloudObjects.asCloudObject(IntervalWindowCoder.of()))); + ImmutableList.of( + timerCloudObject, + CloudObjects.asCloudObject(IntervalWindowCoder.of(), /*sdkComponents=*/ null))); return new ParallelInstruction() .setSystemName(DEFAULT_SOURCE_SYSTEM_NAME) @@ -269,7 +272,8 @@ public class StreamingDataflowWorkerTest { .setSpec(CloudObject.forClass(UngroupedWindmillReader.class)) .setCodec( CloudObjects.asCloudObject( - WindowedValue.getFullCoder(coder, IntervalWindow.getCoder()))))) + WindowedValue.getFullCoder(coder, IntervalWindow.getCoder()), + /*sdkComponents=*/ null)))) .setOutputs( Arrays.asList( new InstructionOutput() @@ -278,7 +282,8 @@ public class StreamingDataflowWorkerTest { .setSystemName(DEFAULT_OUTPUT_SYSTEM_NAME) .setCodec( CloudObjects.asCloudObject( - WindowedValue.getFullCoder(coder, IntervalWindow.getCoder()))))); + WindowedValue.getFullCoder(coder, IntervalWindow.getCoder()), + /*sdkComponents=*/ null)))); } private ParallelInstruction makeDoFnInstruction( @@ -321,7 +326,8 @@ public class StreamingDataflowWorkerTest { .setCodec( CloudObjects.asCloudObject( WindowedValue.getFullCoder( - outputCoder, windowingStrategy.getWindowFn().windowCoder()))))); + outputCoder, windowingStrategy.getWindowFn().windowCoder()), + /*sdkComponents=*/ null)))); } private ParallelInstruction makeDoFnInstruction( @@ -356,7 +362,8 @@ public class StreamingDataflowWorkerTest { .setSpec(spec) .setCodec( CloudObjects.asCloudObject( - WindowedValue.getFullCoder(coder, windowCoder))))); + WindowedValue.getFullCoder(coder, windowCoder), + /*sdkComponents=*/ null)))); } private ParallelInstruction makeSinkInstruction( @@ -1088,7 +1095,8 @@ public class StreamingDataflowWorkerTest { .setCodec( CloudObjects.asCloudObject( WindowedValue.getFullCoder( - StringUtf8Coder.of(), IntervalWindow.getCoder()))))); + StringUtf8Coder.of(), IntervalWindow.getCoder()), + /*sdkComponents=*/ null)))); List<ParallelInstruction> instructions = Arrays.asList( @@ -1190,7 +1198,10 @@ public class StreamingDataflowWorkerTest { .withTimestampCombiner(TimestampCombiner.EARLIEST), sdkComponents) .toByteArray())); - addObject(spec, WorkerPropertyNames.INPUT_CODER, CloudObjects.asCloudObject(windowedKvCoder)); + addObject( + spec, + WorkerPropertyNames.INPUT_CODER, + CloudObjects.asCloudObject(windowedKvCoder, /*sdkComponents=*/ null)); ParallelInstruction mergeWindowsInstruction = new ParallelInstruction() @@ -1208,7 +1219,9 @@ public class StreamingDataflowWorkerTest { .setOriginalName(DEFAULT_OUTPUT_ORIGINAL_NAME) .setSystemName(DEFAULT_OUTPUT_SYSTEM_NAME) .setName("output") - .setCodec(CloudObjects.asCloudObject(windowedGroupedCoder)))); + .setCodec( + CloudObjects.asCloudObject( + windowedGroupedCoder, /*sdkComponents=*/ null)))); List<ParallelInstruction> instructions = Arrays.asList( @@ -1492,7 +1505,10 @@ public class StreamingDataflowWorkerTest { .withAllowedLateness(Duration.standardMinutes(60)), sdkComponents) .toByteArray())); - addObject(spec, WorkerPropertyNames.INPUT_CODER, CloudObjects.asCloudObject(windowedKvCoder)); + addObject( + spec, + WorkerPropertyNames.INPUT_CODER, + CloudObjects.asCloudObject(windowedKvCoder, /*sdkComponents=*/ null)); ParallelInstruction mergeWindowsInstruction = new ParallelInstruction() @@ -1510,7 +1526,9 @@ public class StreamingDataflowWorkerTest { .setOriginalName(DEFAULT_OUTPUT_ORIGINAL_NAME) .setSystemName(DEFAULT_OUTPUT_SYSTEM_NAME) .setName("output") - .setCodec(CloudObjects.asCloudObject(windowedGroupedCoder)))); + .setCodec( + CloudObjects.asCloudObject( + windowedGroupedCoder, /*sdkComponents=*/ null)))); List<ParallelInstruction> instructions = Arrays.asList( @@ -1650,7 +1668,8 @@ public class StreamingDataflowWorkerTest { WindowedValue.getFullCoder( ValueWithRecordId.ValueWithRecordIdCoder.of( KvCoder.of(VarIntCoder.of(), VarIntCoder.of())), - GlobalWindow.Coder.INSTANCE)); + GlobalWindow.Coder.INSTANCE), + /*sdkComponents=*/ null); return Arrays.asList( new ParallelInstruction() @@ -2074,7 +2093,8 @@ public class StreamingDataflowWorkerTest { WindowedValue.getFullCoder( ValueWithRecordId.ValueWithRecordIdCoder.of( KvCoder.of(VarIntCoder.of(), VarIntCoder.of())), - GlobalWindow.Coder.INSTANCE)); + GlobalWindow.Coder.INSTANCE), + /*sdkComponents=*/ null); TestCountingSource counter = new TestCountingSource(3).withThrowOnFirstSnapshot(true); List<ParallelInstruction> instructions = diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingPCollectionViewWriterDoFnFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingPCollectionViewWriterDoFnFactoryTest.java index 9871ce0..f920a23 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingPCollectionViewWriterDoFnFactoryTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingPCollectionViewWriterDoFnFactoryTest.java @@ -49,7 +49,8 @@ public class StreamingPCollectionViewWriterDoFnFactoryTest { CloudObject coder = CloudObjects.asCloudObject( - WindowedValue.getFullCoder(BigEndianIntegerCoder.of(), GlobalWindow.Coder.INSTANCE)); + WindowedValue.getFullCoder(BigEndianIntegerCoder.of(), GlobalWindow.Coder.INSTANCE), + /*sdkComponents=*/ null); ParDoFn parDoFn = new StreamingPCollectionViewWriterDoFnFactory() .create( diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java index befc876..661e394 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java @@ -105,9 +105,10 @@ public class LengthPrefixUnknownCodersTest { @Test public void testLengthPrefixUnknownCoders() throws Exception { Map<String, Object> lengthPrefixedCoderCloudObject = - forCodec(CloudObjects.asCloudObject(windowedValueCoder), false); + forCodec(CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ null), false); assertEquals( - CloudObjects.asCloudObject(prefixedWindowedValueCoder), lengthPrefixedCoderCloudObject); + CloudObjects.asCloudObject(prefixedWindowedValueCoder, /*sdkComponents=*/ null), + lengthPrefixedCoderCloudObject); } /** Test bypassing unknown coders that are already wrapped with {@code LengthPrefixCoder} */ @@ -119,7 +120,7 @@ public class LengthPrefixUnknownCodersTest { GlobalWindow.Coder.INSTANCE); Map<String, Object> lengthPrefixedCoderCloudObject = - forCodec(CloudObjects.asCloudObject(windowedValueCoder), false); + forCodec(CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ null), false); Coder<WindowedValue<KV<String, Integer>>> expectedCoder = WindowedValue.getFullCoder( @@ -127,7 +128,9 @@ public class LengthPrefixUnknownCodersTest { LengthPrefixCoder.of(StringUtf8Coder.of()), LengthPrefixCoder.of(VarIntCoder.of())), GlobalWindow.Coder.INSTANCE); - assertEquals(CloudObjects.asCloudObject(expectedCoder), lengthPrefixedCoderCloudObject); + assertEquals( + CloudObjects.asCloudObject(expectedCoder, /*sdkComponents=*/ null), + lengthPrefixedCoderCloudObject); } /** Test replacing unknown coders with {@code LengthPrefixCoder<ByteArray>} */ @@ -139,71 +142,81 @@ public class LengthPrefixUnknownCodersTest { GlobalWindow.Coder.INSTANCE); Map<String, Object> lengthPrefixedCoderCloudObject = - forCodec(CloudObjects.asCloudObject(windowedValueCoder), true); + forCodec(CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ null), true); assertEquals( - CloudObjects.asCloudObject(prefixedAndReplacedWindowedValueCoder), + CloudObjects.asCloudObject(prefixedAndReplacedWindowedValueCoder, /*sdkComponents=*/ null), lengthPrefixedCoderCloudObject); } @Test public void testLengthPrefixInstructionOutputCoder() throws Exception { InstructionOutput output = new InstructionOutput(); - output.setCodec(CloudObjects.asCloudObject(windowedValueCoder)); + output.setCodec(CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ null)); output.setFactory(new JacksonFactory()); InstructionOutput prefixedOutput = forInstructionOutput(output, false); - assertEquals(CloudObjects.asCloudObject(prefixedWindowedValueCoder), prefixedOutput.getCodec()); + assertEquals( + CloudObjects.asCloudObject(prefixedWindowedValueCoder, /*sdkComponents=*/ null), + prefixedOutput.getCodec()); // Should not mutate the instruction. - assertEquals(output.getCodec(), CloudObjects.asCloudObject(windowedValueCoder)); + assertEquals( + output.getCodec(), CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ null)); } @Test public void testLengthPrefixReadInstructionCoder() throws Exception { ReadInstruction readInstruction = new ReadInstruction(); readInstruction.setSource( - new Source().setCodec(CloudObjects.asCloudObject(windowedValueCoder))); + new Source() + .setCodec(CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ null))); instruction.setRead(readInstruction); ParallelInstruction prefixedInstruction = forParallelInstruction(instruction, false); assertEquals( - CloudObjects.asCloudObject(prefixedWindowedValueCoder), + CloudObjects.asCloudObject(prefixedWindowedValueCoder, /*sdkComponents=*/ null), prefixedInstruction.getRead().getSource().getCodec()); // Should not mutate the instruction. assertEquals( - readInstruction.getSource().getCodec(), CloudObjects.asCloudObject(windowedValueCoder)); + readInstruction.getSource().getCodec(), + CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ null)); } @Test public void testLengthPrefixWriteInstructionCoder() throws Exception { WriteInstruction writeInstruction = new WriteInstruction(); - writeInstruction.setSink(new Sink().setCodec(CloudObjects.asCloudObject(windowedValueCoder))); + writeInstruction.setSink( + new Sink() + .setCodec(CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ null))); instruction.setWrite(writeInstruction); ParallelInstruction prefixedInstruction = forParallelInstruction(instruction, false); assertEquals( - CloudObjects.asCloudObject(prefixedWindowedValueCoder), + CloudObjects.asCloudObject(prefixedWindowedValueCoder, /*sdkComponents=*/ null), prefixedInstruction.getWrite().getSink().getCodec()); // Should not mutate the instruction. assertEquals( - CloudObjects.asCloudObject(windowedValueCoder), writeInstruction.getSink().getCodec()); + CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ null), + writeInstruction.getSink().getCodec()); } @Test public void testLengthPrefixParDoInstructionCoder() throws Exception { ParDoInstruction parDo = new ParDoInstruction(); CloudObject spec = CloudObject.forClassName(MERGE_BUCKETS_DO_FN); - spec.put(WorkerPropertyNames.INPUT_CODER, CloudObjects.asCloudObject(windowedValueCoder)); + spec.put( + WorkerPropertyNames.INPUT_CODER, + CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ null)); parDo.setUserFn(spec); instruction.setParDo(parDo); ParallelInstruction prefixedInstruction = forParallelInstruction(instruction, false); assertEquals( - CloudObjects.asCloudObject(prefixedWindowedValueCoder), + CloudObjects.asCloudObject(prefixedWindowedValueCoder, /*sdkComponents=*/ null), prefixedInstruction.getParDo().getUserFn().get(WorkerPropertyNames.INPUT_CODER)); // Should not mutate the instruction. assertEquals( - CloudObjects.asCloudObject(windowedValueCoder), + CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ null), parDo.getUserFn().get(WorkerPropertyNames.INPUT_CODER)); } @@ -256,7 +269,7 @@ public class LengthPrefixUnknownCodersTest { network.addNode(grpcPortNode); network.addEdge(grpcPortNode, instructionOutputNode, DefaultEdge.create()); assertEquals( - CloudObjects.asCloudObject(prefixedWindowedValueCoder), + CloudObjects.asCloudObject(prefixedWindowedValueCoder, /*sdkComponents=*/ null), ((InstructionOutputNode) forInstructionOutputNode(network).apply(instructionOutputNode)) .getInstructionOutput() .getCodec()); @@ -269,7 +282,7 @@ public class LengthPrefixUnknownCodersTest { network.addNode(grpcPortNode); network.addEdge(instructionOutputNode, grpcPortNode, DefaultEdge.create()); assertEquals( - CloudObjects.asCloudObject(prefixedWindowedValueCoder), + CloudObjects.asCloudObject(prefixedWindowedValueCoder, /*sdkComponents=*/ null), ((InstructionOutputNode) forInstructionOutputNode(network).apply(instructionOutputNode)) .getInstructionOutput() .getCodec()); @@ -283,7 +296,7 @@ public class LengthPrefixUnknownCodersTest { network.addNode(readNode); network.addEdge(readNode, instructionOutputNode, DefaultEdge.create()); assertEquals( - CloudObjects.asCloudObject(windowedValueCoder), + CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ null), ((InstructionOutputNode) forInstructionOutputNode(network).apply(instructionOutputNode)) .getInstructionOutput() .getCodec()); @@ -317,7 +330,8 @@ public class LengthPrefixUnknownCodersTest { SideInputInfo sideInputInfo = new SideInputInfo().setSources(new ArrayList<>()); sideInputInfo.setFactory(new JacksonFactory()); for (Coder<?> coder : coders) { - Source source = new Source().setCodec(CloudObjects.asCloudObject(coder)); + Source source = + new Source().setCodec(CloudObjects.asCloudObject(coder, /*sdkComponents=*/ null)); source.setFactory(new JacksonFactory()); sideInputInfo.getSources().add(source); } @@ -340,7 +354,7 @@ public class LengthPrefixUnknownCodersTest { new ReadInstruction() .setSource( new Source() - .setCodec(CloudObjects.asCloudObject(coder)) + .setCodec(CloudObjects.asCloudObject(coder, /*sdkComponents=*/ null)) .setSpec(CloudObject.forClassName(readClassName)))); parallelInstruction.setFactory(new JacksonFactory()); @@ -349,7 +363,9 @@ public class LengthPrefixUnknownCodersTest { private static InstructionOutputNode createInstructionOutputNode(String name, Coder<?> coder) { InstructionOutput instructionOutput = - new InstructionOutput().setName(name).setCodec(CloudObjects.asCloudObject(coder)); + new InstructionOutput() + .setName(name) + .setCodec(CloudObjects.asCloudObject(coder, /*sdkComponents=*/ null)); instructionOutput.setFactory(new JacksonFactory()); return InstructionOutputNode.create(instructionOutput); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/CloudSourceUtilsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/CloudSourceUtilsTest.java index 9814bb3..43baf71 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/CloudSourceUtilsTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/CloudSourceUtilsTest.java @@ -62,7 +62,7 @@ public class CloudSourceUtilsTest { source.getBaseSpecs().add(grandparent); source.getBaseSpecs().add(parent); source.setSpec(child); - source.setCodec(CloudObjects.asCloudObject(StringUtf8Coder.of())); + source.setCodec(CloudObjects.asCloudObject(StringUtf8Coder.of(), /*sdkComponents=*/ null)); Source flat = CloudSourceUtils.flattenBaseSpecs(source); assertNull(flat.getBaseSpecs()); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/TimerOrElementTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/TimerOrElementTest.java index a7387ca..b4ae74a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/TimerOrElementTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/TimerOrElementTest.java @@ -55,7 +55,8 @@ public class TimerOrElementTest { "com.google.cloud.dataflow.sdk.util.TimerOrElement$TimerOrElementCoder"); List<CloudObject> component = Collections.singletonList( - CloudObjects.asCloudObject(KvCoder.of(VarLongCoder.of(), ByteArrayCoder.of()))); + CloudObjects.asCloudObject( + KvCoder.of(VarLongCoder.of(), ByteArrayCoder.of()), /*sdkComponents=*/ null)); Structs.addList(cloudObject, PropertyNames.COMPONENT_ENCODINGS, component); Coder<?> decoded = CloudObjects.coderFromCloudObject(cloudObject);