[ 
https://issues.apache.org/jira/browse/BEAM-6067?focusedWorklogId=166257&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166257
 ]

ASF GitHub Bot logged work on BEAM-6067:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Nov/18 03:25
            Start Date: 15/Nov/18 03:25
    Worklog Time Spent: 10m 
      Work Description: lukecwik closed pull request #7045: [BEAM-6067] Specify 
pipeline_coder_id property in non-Beam-standard CloudObject coders.
URL: https://github.com/apache/beam/pull/7045
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java
index e07a478774f..cbb22af0ce7 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java
@@ -86,6 +86,10 @@
         Coder.class.getSimpleName());
   }
 
+  public static boolean isKnownCoder(Coder<?> coder) {
+    return BEAM_MODEL_CODER_URNS.containsKey(coder.getClass());
+  }
+
   @Override
   public Map<Class<? extends Coder>, String> getCoderURNs() {
     return BEAM_MODEL_CODER_URNS;
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index f5190df7298..a99bd306758 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -568,7 +568,7 @@ private StepTranslator(Translator translator, Step step) {
 
     @Override
     public void addEncodingInput(Coder<?> coder) {
-      CloudObject encoding = CloudObjects.asCloudObject(coder);
+      CloudObject encoding = translateCoder(coder, translator);
       addObject(getProperties(), PropertyNames.ENCODING, encoding);
     }
 
@@ -668,7 +668,7 @@ private void addOutput(String name, PValue value, Coder<?> 
valueCoder) {
       if (valueCoder != null) {
         // Verify that encoding can be decoded, in order to catch serialization
         // failures as early as possible.
-        CloudObject encoding = CloudObjects.asCloudObject(valueCoder);
+        CloudObject encoding = translateCoder(valueCoder, translator);
         addObject(outputInfo, PropertyNames.ENCODING, encoding);
         translator.outputCoders.put(value, valueCoder);
       }
@@ -1016,7 +1016,7 @@ public void translate(
 
             stepContext.addInput(
                 PropertyNames.RESTRICTION_CODER,
-                CloudObjects.asCloudObject(transform.getRestrictionCoder()));
+                translateCoder(transform.getRestrictionCoder(), context));
           }
         });
   }
@@ -1098,4 +1098,8 @@ private static void translateOutputs(
       stepContext.addOutput(tag.getId(), (PCollection<?>) 
taggedOutput.getValue());
     }
   }
+
+  private static CloudObject translateCoder(Coder<?> coder, TranslationContext 
context) {
+    return CloudObjects.asCloudObject(coder, context.isFnApi() ? 
context.getSdkComponents() : null);
+  }
 }
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java
index 9a4047d2569..a4ce53313a9 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.dataflow.util;
 
 import org.apache.avro.Schema;
+import org.apache.beam.runners.core.construction.SdkComponents;
 import org.apache.beam.sdk.coders.AvroCoder;
 
 /** A {@link CloudObjectTranslator} for {@link AvroCoder}. */
@@ -26,7 +27,7 @@
   private static final String SCHEMA_FIELD = "schema";
 
   @Override
-  public CloudObject toCloudObject(AvroCoder target) {
+  public CloudObject toCloudObject(AvroCoder target, SdkComponents 
sdkComponents) {
     CloudObject base = CloudObject.forClass(AvroCoder.class);
     Structs.addString(base, SCHEMA_FIELD, target.getSchema().toString());
     Structs.addString(base, TYPE_FIELD, target.getType().getName());
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
index aa4af84233b..3638eaf6867 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
@@ -17,13 +17,15 @@
  */
 package org.apache.beam.runners.dataflow.util;
 
+import org.apache.beam.runners.core.construction.SdkComponents;
+
 /**
  * A translator that takes an object and creates a {@link CloudObject} which 
can be converted back
  * to the original object.
  */
 public interface CloudObjectTranslator<T> {
   /** Converts the provided object into an equivalent {@link CloudObject}. */
-  CloudObject toCloudObject(T target);
+  CloudObject toCloudObject(T target, SdkComponents sdkComponents);
 
   /** Converts back into the original object from a provided {@link 
CloudObject}. */
   T fromCloudObject(CloudObject cloudObject);
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
index 1d1966d0d70..b6f738876e7 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
@@ -24,6 +24,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.runners.core.construction.SdkComponents;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CustomCoder;
@@ -50,11 +51,12 @@
 class CloudObjectTranslators {
   private CloudObjectTranslators() {}
 
-  private static CloudObject addComponents(CloudObject base, List<? extends 
Coder<?>> components) {
+  private static CloudObject addComponents(
+      CloudObject base, List<? extends Coder<?>> components, SdkComponents 
sdkComponents) {
     if (!components.isEmpty()) {
       List<CloudObject> cloudComponents = new ArrayList<>(components.size());
       for (Coder component : components) {
-        cloudComponents.add(CloudObjects.asCloudObject(component));
+        cloudComponents.add(CloudObjects.asCloudObject(component, 
sdkComponents));
       }
       Structs.addList(base, PropertyNames.COMPONENT_ENCODINGS, 
cloudComponents);
     }
@@ -79,11 +81,13 @@ private static CloudObject addComponents(CloudObject base, 
List<? extends Coder<
   public static CloudObjectTranslator<KvCoder> pair() {
     return new CloudObjectTranslator<KvCoder>() {
       @Override
-      public CloudObject toCloudObject(KvCoder target) {
+      public CloudObject toCloudObject(KvCoder target, SdkComponents 
sdkComponents) {
         CloudObject result = 
CloudObject.forClassName(CloudObjectKinds.KIND_PAIR);
         Structs.addBoolean(result, PropertyNames.IS_PAIR_LIKE, true);
         return addComponents(
-            result, ImmutableList.<Coder<?>>of(target.getKeyCoder(), 
target.getValueCoder()));
+            result,
+            ImmutableList.<Coder<?>>of(target.getKeyCoder(), 
target.getValueCoder()),
+            sdkComponents);
       }
 
       @Override
@@ -112,10 +116,11 @@ public String cloudObjectClassName() {
   public static CloudObjectTranslator<IterableCoder> stream() {
     return new CloudObjectTranslator<IterableCoder>() {
       @Override
-      public CloudObject toCloudObject(IterableCoder target) {
+      public CloudObject toCloudObject(IterableCoder target, SdkComponents 
sdkComponents) {
         CloudObject result = 
CloudObject.forClassName(CloudObjectKinds.KIND_STREAM);
         Structs.addBoolean(result, PropertyNames.IS_STREAM_LIKE, true);
-        return addComponents(result, 
Collections.<Coder<?>>singletonList(target.getElemCoder()));
+        return addComponents(
+            result, 
Collections.<Coder<?>>singletonList(target.getElemCoder()), sdkComponents);
       }
 
       @Override
@@ -144,10 +149,11 @@ public String cloudObjectClassName() {
   static CloudObjectTranslator<LengthPrefixCoder> lengthPrefix() {
     return new CloudObjectTranslator<LengthPrefixCoder>() {
       @Override
-      public CloudObject toCloudObject(LengthPrefixCoder target) {
+      public CloudObject toCloudObject(LengthPrefixCoder target, SdkComponents 
sdkComponents) {
         return addComponents(
             CloudObject.forClassName(CloudObjectKinds.KIND_LENGTH_PREFIX),
-            Collections.<Coder<?>>singletonList(target.getValueCoder()));
+            Collections.<Coder<?>>singletonList(target.getValueCoder()),
+            sdkComponents);
       }
 
       @Override
@@ -176,9 +182,11 @@ public String cloudObjectClassName() {
   static CloudObjectTranslator<GlobalWindow.Coder> globalWindow() {
     return new CloudObjectTranslator<GlobalWindow.Coder>() {
       @Override
-      public CloudObject toCloudObject(GlobalWindow.Coder target) {
+      public CloudObject toCloudObject(GlobalWindow.Coder target, 
SdkComponents sdkComponents) {
         return addComponents(
-            CloudObject.forClassName(CloudObjectKinds.KIND_GLOBAL_WINDOW), 
Collections.emptyList());
+            CloudObject.forClassName(CloudObjectKinds.KIND_GLOBAL_WINDOW),
+            Collections.emptyList(),
+            sdkComponents);
       }
 
       @Override
@@ -205,10 +213,11 @@ public String cloudObjectClassName() {
   static CloudObjectTranslator<IntervalWindowCoder> intervalWindow() {
     return new CloudObjectTranslator<IntervalWindowCoder>() {
       @Override
-      public CloudObject toCloudObject(IntervalWindowCoder target) {
+      public CloudObject toCloudObject(IntervalWindowCoder target, 
SdkComponents sdkComponents) {
         return addComponents(
             CloudObject.forClassName(CloudObjectKinds.KIND_INTERVAL_WINDOW),
-            Collections.emptyList());
+            Collections.emptyList(),
+            sdkComponents);
       }
 
       @Override
@@ -235,11 +244,13 @@ public String cloudObjectClassName() {
   static CloudObjectTranslator<FullWindowedValueCoder> windowedValue() {
     return new CloudObjectTranslator<FullWindowedValueCoder>() {
       @Override
-      public CloudObject toCloudObject(FullWindowedValueCoder target) {
+      public CloudObject toCloudObject(FullWindowedValueCoder target, 
SdkComponents sdkComponents) {
         CloudObject result = 
CloudObject.forClassName(CloudObjectKinds.KIND_WINDOWED_VALUE);
         Structs.addBoolean(result, PropertyNames.IS_WRAPPER, true);
         return addComponents(
-            result, ImmutableList.<Coder<?>>of(target.getValueCoder(), 
target.getWindowCoder()));
+            result,
+            ImmutableList.<Coder<?>>of(target.getValueCoder(), 
target.getWindowCoder()),
+            sdkComponents);
       }
 
       @Override
@@ -270,9 +281,11 @@ public String cloudObjectClassName() {
   static CloudObjectTranslator<ByteArrayCoder> bytes() {
     return new CloudObjectTranslator<ByteArrayCoder>() {
       @Override
-      public CloudObject toCloudObject(ByteArrayCoder target) {
+      public CloudObject toCloudObject(ByteArrayCoder target, SdkComponents 
sdkComponents) {
         return addComponents(
-            CloudObject.forClassName(CloudObjectKinds.KIND_BYTES), 
Collections.emptyList());
+            CloudObject.forClassName(CloudObjectKinds.KIND_BYTES),
+            Collections.emptyList(),
+            sdkComponents);
       }
 
       @Override
@@ -299,8 +312,9 @@ public String cloudObjectClassName() {
   static CloudObjectTranslator<VarLongCoder> varInt() {
     return new CloudObjectTranslator<VarLongCoder>() {
       @Override
-      public CloudObject toCloudObject(VarLongCoder target) {
-        return addComponents(CloudObject.forClass(target.getClass()), 
Collections.emptyList());
+      public CloudObject toCloudObject(VarLongCoder target, SdkComponents 
sdkComponents) {
+        return addComponents(
+            CloudObject.forClass(target.getClass()), Collections.emptyList(), 
sdkComponents);
       }
 
       @Override
@@ -326,7 +340,7 @@ public String cloudObjectClassName() {
   public static CloudObjectTranslator<Coder> javaSerialized() {
     return new CloudObjectTranslator<Coder>() {
       @Override
-      public CloudObject toCloudObject(Coder target) {
+      public CloudObject toCloudObject(Coder target, SdkComponents 
sdkComponents) {
         // CustomCoder is used as the "marker" for a java-serialized coder
         CloudObject cloudObject = CloudObject.forClass(CustomCoder.class);
         Structs.addString(cloudObject, TYPE_FIELD, 
target.getClass().getName());
@@ -363,7 +377,7 @@ public String cloudObjectClassName() {
     InstanceBuilder.ofType(coderClass).fromFactoryMethod("of").build();
     return new CloudObjectTranslator<T>() {
       @Override
-      public CloudObject toCloudObject(T target) {
+      public CloudObject toCloudObject(T target, SdkComponents sdkComponents) {
         return CloudObject.forClass(coderClass);
       }
 
@@ -388,9 +402,10 @@ public String cloudObjectClassName() {
       final Class<? extends IterableLikeCoder> clazz) {
     return new CloudObjectTranslator<IterableLikeCoder>() {
       @Override
-      public CloudObject toCloudObject(IterableLikeCoder target) {
+      public CloudObject toCloudObject(IterableLikeCoder target, SdkComponents 
sdkComponents) {
         CloudObject base = CloudObject.forClass(clazz);
-        return addComponents(base, 
Collections.<Coder<?>>singletonList(target.getElemCoder()));
+        return addComponents(
+            base, Collections.<Coder<?>>singletonList(target.getElemCoder()), 
sdkComponents);
       }
 
       @Override
@@ -422,10 +437,12 @@ public String cloudObjectClassName() {
   public static CloudObjectTranslator<MapCoder> map() {
     return new CloudObjectTranslator<MapCoder>() {
       @Override
-      public CloudObject toCloudObject(MapCoder target) {
+      public CloudObject toCloudObject(MapCoder target, SdkComponents 
sdkComponents) {
         CloudObject base = CloudObject.forClass(MapCoder.class);
         return addComponents(
-            base, ImmutableList.<Coder<?>>of(target.getKeyCoder(), 
target.getValueCoder()));
+            base,
+            ImmutableList.<Coder<?>>of(target.getKeyCoder(), 
target.getValueCoder()),
+            sdkComponents);
       }
 
       @Override
@@ -454,9 +471,10 @@ public String cloudObjectClassName() {
   public static CloudObjectTranslator<NullableCoder> nullable() {
     return new CloudObjectTranslator<NullableCoder>() {
       @Override
-      public CloudObject toCloudObject(NullableCoder target) {
+      public CloudObject toCloudObject(NullableCoder target, SdkComponents 
sdkComponents) {
         CloudObject base = CloudObject.forClass(NullableCoder.class);
-        return addComponents(base, 
Collections.<Coder<?>>singletonList(target.getValueCoder()));
+        return addComponents(
+            base, Collections.<Coder<?>>singletonList(target.getValueCoder()), 
sdkComponents);
       }
 
       @Override
@@ -485,8 +503,9 @@ public String cloudObjectClassName() {
   public static CloudObjectTranslator<UnionCoder> union() {
     return new CloudObjectTranslator<UnionCoder>() {
       @Override
-      public CloudObject toCloudObject(UnionCoder target) {
-        return addComponents(CloudObject.forClass(UnionCoder.class), 
target.getElementCoders());
+      public CloudObject toCloudObject(UnionCoder target, SdkComponents 
sdkComponents) {
+        return addComponents(
+            CloudObject.forClass(UnionCoder.class), target.getElementCoders(), 
sdkComponents);
       }
 
       @Override
@@ -510,11 +529,12 @@ public String cloudObjectClassName() {
   public static CloudObjectTranslator<CoGbkResultCoder> coGroupByKeyResult() {
     return new CloudObjectTranslator<CoGbkResultCoder>() {
       @Override
-      public CloudObject toCloudObject(CoGbkResultCoder target) {
+      public CloudObject toCloudObject(CoGbkResultCoder target, SdkComponents 
sdkComponents) {
         CloudObject base = CloudObject.forClass(CoGbkResultCoder.class);
         Structs.addObject(
             base, PropertyNames.CO_GBK_RESULT_SCHEMA, 
toCloudObject(target.getSchema()));
-        return addComponents(base, 
Collections.singletonList(target.getUnionCoder()));
+        return addComponents(
+            base, Collections.singletonList(target.getUnionCoder()), 
sdkComponents);
       }
 
       private CloudObject toCloudObject(CoGbkResultSchema schema) {
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
index 4b026dff74e..a96d3dd0888 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
@@ -23,6 +23,9 @@
 import com.google.common.collect.ImmutableMap;
 import java.util.Map;
 import java.util.ServiceLoader;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.ModelCoderRegistrar;
+import org.apache.beam.runners.core.construction.SdkComponents;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CustomCoder;
 
@@ -58,11 +61,12 @@ private CloudObjects() {}
   }
 
   /** Convert the provided {@link Coder} into a {@link CloudObject}. */
-  public static CloudObject asCloudObject(Coder<?> coder) {
+  public static CloudObject asCloudObject(Coder<?> coder, @Nullable 
SdkComponents sdkComponents) {
     CloudObjectTranslator<Coder> translator =
         (CloudObjectTranslator<Coder>) CODER_TRANSLATORS.get(coder.getClass());
+    CloudObject encoding;
     if (translator != null) {
-      return translator.toCloudObject(coder);
+      encoding = translator.toCloudObject(coder, sdkComponents);
     } else {
       CloudObjectTranslator customCoderTranslator = 
CODER_TRANSLATORS.get(CustomCoder.class);
       checkNotNull(
@@ -71,8 +75,17 @@ public static CloudObject asCloudObject(Coder<?> coder) {
           CloudObjectTranslator.class.getSimpleName(),
           CustomCoder.class.getSimpleName(),
           DefaultCoderCloudObjectTranslatorRegistrar.class.getSimpleName());
-      return customCoderTranslator.toCloudObject(coder);
+      encoding = customCoderTranslator.toCloudObject(coder, sdkComponents);
     }
+    if (sdkComponents != null && !ModelCoderRegistrar.isKnownCoder(coder)) {
+      try {
+        String coderId = sdkComponents.registerCoder(coder);
+        Structs.addString(encoding, PropertyNames.PIPELINE_PROTO_CODER_ID, 
coderId);
+      } catch (Exception e) {
+        throw new RuntimeException("Unable to register coder " + coder, e);
+      }
+    }
+    return encoding;
   }
 
   public static Coder<?> coderFromCloudObject(CloudObject cloudObject) {
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
index bd510c88319..e644e0f78e6 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
@@ -64,4 +64,5 @@
   public static final String DISPLAY_DATA = "display_data";
   public static final String RESTRICTION_CODER = "restriction_coder";
   public static final String IMPULSE_ELEMENT = "impulse_element";
+  public static final String PIPELINE_PROTO_CODER_ID = 
"pipeline_proto_coder_id";
 }
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java
index 8cac5858edb..b1be7a3b746 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java
@@ -20,6 +20,7 @@
 import static com.google.common.base.Preconditions.checkArgument;
 
 import java.io.Serializable;
+import org.apache.beam.runners.core.construction.SdkComponents;
 import org.apache.beam.sdk.coders.SerializableCoder;
 
 /** A {@link CloudObjectTranslator} for {@link SerializableCoder}. */
@@ -27,7 +28,7 @@
   private static final String TYPE_FIELD = "type";
 
   @Override
-  public CloudObject toCloudObject(SerializableCoder target) {
+  public CloudObject toCloudObject(SerializableCoder target, SdkComponents 
sdkComponents) {
     CloudObject base = CloudObject.forClass(SerializableCoder.class);
     Structs.addString(base, TYPE_FIELD, target.getRecordType().getName());
     return base;
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
index 4d75ecd0c71..b7e8cc89337 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
@@ -20,7 +20,9 @@
 import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableList.Builder;
@@ -32,6 +34,8 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import org.apache.beam.runners.core.construction.ModelCoderRegistrar;
+import org.apache.beam.runners.core.construction.SdkComponents;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
@@ -149,12 +153,44 @@ public void defaultCodersIncludesCustomCoder() {
 
     @Test
     public void toAndFromCloudObject() throws Exception {
-      CloudObject cloudObject = CloudObjects.asCloudObject(coder);
+      CloudObject cloudObject = CloudObjects.asCloudObject(coder, 
/*sdkComponents=*/ null);
       Coder<?> fromCloudObject = 
CloudObjects.coderFromCloudObject(cloudObject);
 
       assertEquals(coder.getClass(), fromCloudObject.getClass());
       assertEquals(coder, fromCloudObject);
     }
+
+    @Test
+    public void toAndFromCloudObjectWithSdkComponents() throws Exception {
+      SdkComponents sdkComponents = SdkComponents.create();
+      CloudObject cloudObject = CloudObjects.asCloudObject(coder, 
sdkComponents);
+      Coder<?> fromCloudObject = 
CloudObjects.coderFromCloudObject(cloudObject);
+
+      assertEquals(coder.getClass(), fromCloudObject.getClass());
+      assertEquals(coder, fromCloudObject);
+
+      checkPipelineProtoCoderIds(coder, cloudObject, sdkComponents);
+    }
+
+    private static void checkPipelineProtoCoderIds(
+        Coder<?> coder, CloudObject cloudObject, SdkComponents sdkComponents) 
throws Exception {
+      if (ModelCoderRegistrar.isKnownCoder(coder)) {
+        
assertFalse(cloudObject.containsKey(PropertyNames.PIPELINE_PROTO_CODER_ID));
+      } else {
+        
assertTrue(cloudObject.containsKey(PropertyNames.PIPELINE_PROTO_CODER_ID));
+        assertEquals(
+            sdkComponents.registerCoder(coder),
+            cloudObject.get(PropertyNames.PIPELINE_PROTO_CODER_ID));
+      }
+      List<? extends Coder<?>> coderArguments = coder.getCoderArguments();
+      Object cloudComponentsObject = 
cloudObject.get(PropertyNames.COMPONENT_ENCODINGS);
+      assertTrue(cloudComponentsObject instanceof List);
+      List<CloudObject> cloudComponents = (List<CloudObject>) 
cloudComponentsObject;
+      assertEquals(coderArguments.size(), cloudComponents.size());
+      for (int i = 0; i < coderArguments.size(); i++) {
+        checkPipelineProtoCoderIds(coderArguments.get(i), 
cloudComponents.get(i), sdkComponents);
+      }
+    }
   }
 
   private static class Record implements Serializable {}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/RunnerHarnessCoderCloudObjectTranslatorRegistrar.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/RunnerHarnessCoderCloudObjectTranslatorRegistrar.java
index 2b2bfd5c7b4..de242d14236 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/RunnerHarnessCoderCloudObjectTranslatorRegistrar.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/RunnerHarnessCoderCloudObjectTranslatorRegistrar.java
@@ -23,6 +23,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.runners.core.construction.SdkComponents;
 import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecordCoder;
 import org.apache.beam.runners.dataflow.util.CloudObject;
 import org.apache.beam.runners.dataflow.util.CloudObjectTranslator;
@@ -69,7 +70,7 @@
       implements CloudObjectTranslator<IsmRecordCoder<?>> {
 
     @Override
-    public CloudObject toCloudObject(IsmRecordCoder<?> target) {
+    public CloudObject toCloudObject(IsmRecordCoder<?> target, SdkComponents 
sdkComponents) {
       throw new UnsupportedOperationException();
     }
 
@@ -99,7 +100,7 @@ public String cloudObjectClassName() {
     InstanceBuilder.ofType(coderClass).fromFactoryMethod("of").build();
     return new CloudObjectTranslator<T>() {
       @Override
-      public CloudObject toCloudObject(T target) {
+      public CloudObject toCloudObject(T target, SdkComponents sdkComponents) {
         throw new UnsupportedOperationException();
       }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java
index 091ab79f291..398037e3373 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java
@@ -229,7 +229,8 @@ public Node typedApply(ParallelInstructionNode input) {
     // Handle well known coders.
     if (LENGTH_PREFIX_CODER_TYPE.equals(coderType)) {
       if (replaceWithByteArrayCoder) {
-        return CloudObjects.asCloudObject(LENGTH_PREFIXED_BYTE_ARRAY_CODER);
+        return CloudObjects.asCloudObject(
+            LENGTH_PREFIXED_BYTE_ARRAY_CODER, /*sdkComponents=*/ null);
       }
       return codec;
     } else if (WELL_KNOWN_CODER_TYPES.contains(coderType)) {
@@ -251,7 +252,7 @@ public Node typedApply(ParallelInstructionNode input) {
 
     // Wrap unknown coders with length prefix coder.
     if (replaceWithByteArrayCoder) {
-      return CloudObjects.asCloudObject(LENGTH_PREFIXED_BYTE_ARRAY_CODER);
+      return CloudObjects.asCloudObject(LENGTH_PREFIXED_BYTE_ARRAY_CODER, 
/*sdkComponents=*/ null);
     } else {
       Map<String, Object> prefixedCodec = new HashMap<>();
       prefixedCodec.put(PropertyNames.OBJECT_TYPE_NAME, 
LENGTH_PREFIX_CODER_TYPE);
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/TimerOrElement.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/TimerOrElement.java
index b0dbb26aa12..9db549eef3f 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/TimerOrElement.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/TimerOrElement.java
@@ -26,6 +26,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.runners.core.construction.SdkComponents;
 import org.apache.beam.runners.dataflow.util.CloudObject;
 import org.apache.beam.runners.dataflow.util.CloudObjectTranslator;
 import org.apache.beam.runners.dataflow.util.CloudObjects;
@@ -70,7 +71,7 @@ private TimerOrElementCoder(Coder<ElemT> elemCoder) {
   private static class TimerOrElementCloudObjectTranslator
       implements CloudObjectTranslator<TimerOrElementCoder> {
     @Override
-    public CloudObject toCloudObject(TimerOrElementCoder target) {
+    public CloudObject toCloudObject(TimerOrElementCoder target, SdkComponents 
sdkComponents) {
       throw new IllegalArgumentException("Should never be called");
     }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/AvroByteReaderFactoryTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/AvroByteReaderFactoryTest.java
index 68761fc7811..6c16ae614e7 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/AvroByteReaderFactoryTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/AvroByteReaderFactoryTest.java
@@ -73,7 +73,8 @@ public void testCreatePlainAvroByteReader() throws Exception {
     Coder<?> coder =
         WindowedValue.getFullCoder(BigEndianIntegerCoder.of(), 
GlobalWindow.Coder.INSTANCE);
     NativeReader<?> reader =
-        runTestCreateAvroReader(pathToAvroFile, null, null, 
CloudObjects.asCloudObject(coder));
+        runTestCreateAvroReader(
+            pathToAvroFile, null, null, CloudObjects.asCloudObject(coder, 
/*sdkComponents=*/ null));
 
     Assert.assertThat(reader, new IsInstanceOf(AvroByteReader.class));
     AvroByteReader avroReader = (AvroByteReader) reader;
@@ -88,7 +89,8 @@ public void testCreateRichAvroByteReader() throws Exception {
     Coder<?> coder =
         WindowedValue.getFullCoder(BigEndianIntegerCoder.of(), 
GlobalWindow.Coder.INSTANCE);
     NativeReader<?> reader =
-        runTestCreateAvroReader(pathToAvroFile, 200L, 500L, 
CloudObjects.asCloudObject(coder));
+        runTestCreateAvroReader(
+            pathToAvroFile, 200L, 500L, CloudObjects.asCloudObject(coder, 
/*sdkComponents=*/ null));
 
     Assert.assertThat(reader, new IsInstanceOf(AvroByteReader.class));
     AvroByteReader avroReader = (AvroByteReader) reader;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ConcatReaderFactoryTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ConcatReaderFactoryTest.java
index a12947cff9f..772500fede3 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ConcatReaderFactoryTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ConcatReaderFactoryTest.java
@@ -58,7 +58,8 @@ Source createSourcesWithInMemorySources(List<List<String>> 
allData) {
 
       inMemorySourceDictionary.put(PropertyNames.SOURCE_SPEC, 
inMemorySourceSpec);
 
-      CloudObject textSourceEncoding = 
CloudObjects.asCloudObject(StringUtf8Coder.of());
+      CloudObject textSourceEncoding =
+          CloudObjects.asCloudObject(StringUtf8Coder.of(), /*sdkComponents=*/ 
null);
       inMemorySourceDictionary.put(PropertyNames.ENCODING, textSourceEncoding);
 
       sourcesList.add(inMemorySourceDictionary);
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/InMemoryReaderFactoryTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/InMemoryReaderFactoryTest.java
index 9d36ac003bc..cdb3b2b432c 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/InMemoryReaderFactoryTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/InMemoryReaderFactoryTest.java
@@ -56,7 +56,7 @@
 
     Source cloudSource = new Source();
     cloudSource.setSpec(spec);
-    cloudSource.setCodec(CloudObjects.asCloudObject(coder));
+    cloudSource.setCodec(CloudObjects.asCloudObject(coder, /*sdkComponents=*/ 
null));
 
     return cloudSource;
   }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
index 0cfe3f44977..972c19a0112 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
@@ -120,7 +120,8 @@
           .andThen(new MapTaskToNetworkFunction());
 
   private static final CloudObject windowedStringCoder =
-      
CloudObjects.asCloudObject(WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()));
+      CloudObjects.asCloudObject(
+          WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()), 
/*sdkComponents=*/ null);
 
   private IntrinsicMapTaskExecutorFactory mapTaskExecutorFactory;
   private PipelineOptions options;
@@ -560,13 +561,15 @@ static ParallelInstruction 
createPartialGroupByKeyInstruction(
         CloudObjects.asCloudObject(
             FullWindowedValueCoder.of(
                 KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()),
-                IntervalWindowCoder.of())));
+                IntervalWindowCoder.of()),
+            /*sdkComponents=*/ null));
 
     InstructionOutput output = new InstructionOutput();
     output.setName("pgbk_output_name");
     output.setCodec(
         CloudObjects.asCloudObject(
-            KvCoder.of(StringUtf8Coder.of(), 
IterableCoder.of(BigEndianIntegerCoder.of()))));
+            KvCoder.of(StringUtf8Coder.of(), 
IterableCoder.of(BigEndianIntegerCoder.of())),
+            /*sdkComponents=*/ null));
     output.setOriginalName("originalName");
     output.setSystemName("systemName");
 
@@ -691,7 +694,7 @@ static ParallelInstruction createFlattenInstruction(
 
     InstructionOutput output = new InstructionOutput();
     output.setName("flatten_output_name");
-    output.setCodec(CloudObjects.asCloudObject(StringUtf8Coder.of()));
+    output.setCodec(CloudObjects.asCloudObject(StringUtf8Coder.of(), 
/*sdkComponents=*/ null));
     output.setOriginalName("originalName");
     output.setSystemName("systemName");
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReaderTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReaderTest.java
index a07e97037ce..3edf08e8d48 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReaderTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReaderTest.java
@@ -1495,7 +1495,7 @@ public int compare(Iterable<T> o1, Iterable<T> o2) {
   private <T> void verifyList(List<T> expected, List<T> actual) {
     assertEquals(expected.size(), actual.size());
 
-    List<Integer> iterationOrder = new ArrayList<>();
+    List<Integer> iterationOrder = new ArrayList<Integer>();
     Random random = new Random(1892389023490L);
     for (int i = 0; i < expected.size(); ++i) {
       iterationOrder.add(i);
@@ -1770,7 +1770,8 @@ private static RandomAccessData 
encodeKeyPortion(IsmRecordCoder<?> coder, IsmRec
   private <K, V> Source newIsmSource(IsmRecordCoder<WindowedValue<V>> coder, 
String tmpFilePath) {
     Source source = new Source();
     source.setCodec(
-        CloudObjects.asCloudObject(WindowedValue.getFullCoder(coder, 
GLOBAL_WINDOW_CODER)));
+        CloudObjects.asCloudObject(
+            WindowedValue.getFullCoder(coder, GLOBAL_WINDOW_CODER), 
/*sdkComponents=*/ null));
     source.setSpec(new HashMap<String, Object>());
     source.getSpec().put(PropertyNames.OBJECT_TYPE_NAME, "IsmSource");
     source.getSpec().put(WorkerPropertyNames.FILENAME, tmpFilePath);
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ReaderFactoryTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ReaderFactoryTest.java
index 80d85a8e635..34bc3860b4d 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ReaderFactoryTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ReaderFactoryTest.java
@@ -121,7 +121,8 @@ public void testCreateReader() throws Exception {
 
     Source cloudSource = new Source();
     cloudSource.setSpec(spec);
-    
cloudSource.setCodec(CloudObjects.asCloudObject(BigEndianIntegerCoder.of()));
+    cloudSource.setCodec(
+        CloudObjects.asCloudObject(BigEndianIntegerCoder.of(), 
/*sdkComponents=*/ null));
 
     PipelineOptions options = PipelineOptionsFactory.create();
     ReaderRegistry registry =
@@ -141,7 +142,7 @@ public void testCreateUnknownReader() throws Exception {
     CloudObject spec = CloudObject.forClassName("UnknownSource");
     Source cloudSource = new Source();
     cloudSource.setSpec(spec);
-    cloudSource.setCodec(CloudObjects.asCloudObject(StringUtf8Coder.of()));
+    cloudSource.setCodec(CloudObjects.asCloudObject(StringUtf8Coder.of(), 
/*sdkComponents=*/ null));
     try {
       PipelineOptions options = PipelineOptionsFactory.create();
       ReaderRegistry.defaultRegistry()
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ShuffleReaderFactoryTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ShuffleReaderFactoryTest.java
index 45022d219f3..c12cd9e1b21 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ShuffleReaderFactoryTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ShuffleReaderFactoryTest.java
@@ -87,7 +87,7 @@ void runTestCreateUngroupedShuffleReader(
             shuffleReaderConfig,
             start,
             end,
-            CloudObjects.asCloudObject(coder),
+            CloudObjects.asCloudObject(coder, /*sdkComponents=*/ null),
             
BatchModeExecutionContext.forTesting(PipelineOptionsFactory.create(), 
"testStage"),
             UngroupedShuffleReader.class,
             "UngroupedShuffleSource");
@@ -114,7 +114,8 @@ void runTestCreateGroupingShuffleReader(
             end,
             CloudObjects.asCloudObject(
                 FullWindowedValueCoder.of(
-                    KvCoder.of(keyCoder, IterableCoder.of(valueCoder)), 
IntervalWindowCoder.of())),
+                    KvCoder.of(keyCoder, IterableCoder.of(valueCoder)), 
IntervalWindowCoder.of()),
+                /*sdkComponents=*/ null),
             context,
             GroupingShuffleReader.class,
             "GroupingShuffleSource");
@@ -142,7 +143,8 @@ void runTestCreatePartitioningShuffleReader(
             CloudObjects.asCloudObject(
                 FullWindowedValueCoder.of(
                     KvCoder.of(keyCoder, windowedValueCoder.getValueCoder()),
-                    IntervalWindowCoder.of())),
+                    IntervalWindowCoder.of()),
+                /*sdkComponents=*/ null),
             
BatchModeExecutionContext.forTesting(PipelineOptionsFactory.create(), 
"testStage"),
             PartitioningShuffleReader.class,
             "PartitioningShuffleSource");
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SinkRegistryTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SinkRegistryTest.java
index 099dae42785..89c1a699519 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SinkRegistryTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SinkRegistryTest.java
@@ -59,7 +59,7 @@ public void testCreateUnknownSink() throws Exception {
     com.google.api.services.dataflow.model.Sink cloudSink =
         new com.google.api.services.dataflow.model.Sink();
     cloudSink.setSpec(spec);
-    cloudSink.setCodec(CloudObjects.asCloudObject(StringUtf8Coder.of()));
+    cloudSink.setCodec(CloudObjects.asCloudObject(StringUtf8Coder.of(), 
/*sdkComponents=*/ null));
     try {
       SinkRegistry.defaultRegistry()
           .create(
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
index b87ee45243c..d69e304a0bf 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
@@ -230,7 +230,8 @@ private ParallelInstruction 
makeWindowingSourceInstruction(Coder<?> coder) {
     CloudObject timerCloudObject =
         CloudObject.forClassName(
             
"com.google.cloud.dataflow.sdk.util.TimerOrElement$TimerOrElementCoder");
-    List<CloudObject> component = 
Collections.singletonList(CloudObjects.asCloudObject(coder));
+    List<CloudObject> component =
+        Collections.singletonList(CloudObjects.asCloudObject(coder, 
/*sdkComponents=*/ null));
     Structs.addList(timerCloudObject, PropertyNames.COMPONENT_ENCODINGS, 
component);
 
     CloudObject encodedCoder = CloudObject.forClassName("kind:windowed_value");
@@ -238,7 +239,9 @@ private ParallelInstruction 
makeWindowingSourceInstruction(Coder<?> coder) {
     Structs.addList(
         encodedCoder,
         PropertyNames.COMPONENT_ENCODINGS,
-        ImmutableList.of(timerCloudObject, 
CloudObjects.asCloudObject(IntervalWindowCoder.of())));
+        ImmutableList.of(
+            timerCloudObject,
+            CloudObjects.asCloudObject(IntervalWindowCoder.of(), 
/*sdkComponents=*/ null)));
 
     return new ParallelInstruction()
         .setSystemName(DEFAULT_SOURCE_SYSTEM_NAME)
@@ -269,7 +272,8 @@ private ParallelInstruction makeSourceInstruction(Coder<?> 
coder) {
                         
.setSpec(CloudObject.forClass(UngroupedWindmillReader.class))
                         .setCodec(
                             CloudObjects.asCloudObject(
-                                WindowedValue.getFullCoder(coder, 
IntervalWindow.getCoder())))))
+                                WindowedValue.getFullCoder(coder, 
IntervalWindow.getCoder()),
+                                /*sdkComponents=*/ null))))
         .setOutputs(
             Arrays.asList(
                 new InstructionOutput()
@@ -278,7 +282,8 @@ private ParallelInstruction makeSourceInstruction(Coder<?> 
coder) {
                     .setSystemName(DEFAULT_OUTPUT_SYSTEM_NAME)
                     .setCodec(
                         CloudObjects.asCloudObject(
-                            WindowedValue.getFullCoder(coder, 
IntervalWindow.getCoder())))));
+                            WindowedValue.getFullCoder(coder, 
IntervalWindow.getCoder()),
+                            /*sdkComponents=*/ null))));
   }
 
   private ParallelInstruction makeDoFnInstruction(
@@ -321,7 +326,8 @@ private ParallelInstruction makeDoFnInstruction(
                     .setCodec(
                         CloudObjects.asCloudObject(
                             WindowedValue.getFullCoder(
-                                outputCoder, 
windowingStrategy.getWindowFn().windowCoder())))));
+                                outputCoder, 
windowingStrategy.getWindowFn().windowCoder()),
+                            /*sdkComponents=*/ null))));
   }
 
   private ParallelInstruction makeDoFnInstruction(
@@ -356,7 +362,8 @@ private ParallelInstruction makeSinkInstruction(
                         .setSpec(spec)
                         .setCodec(
                             CloudObjects.asCloudObject(
-                                WindowedValue.getFullCoder(coder, 
windowCoder)))));
+                                WindowedValue.getFullCoder(coder, windowCoder),
+                                /*sdkComponents=*/ null))));
   }
 
   private ParallelInstruction makeSinkInstruction(
@@ -1088,7 +1095,8 @@ public void testAssignWindows() throws Exception {
                         .setCodec(
                             CloudObjects.asCloudObject(
                                 WindowedValue.getFullCoder(
-                                    StringUtf8Coder.of(), 
IntervalWindow.getCoder())))));
+                                    StringUtf8Coder.of(), 
IntervalWindow.getCoder()),
+                                /*sdkComponents=*/ null))));
 
     List<ParallelInstruction> instructions =
         Arrays.asList(
@@ -1190,7 +1198,10 @@ public void testMergeWindows() throws Exception {
                         .withTimestampCombiner(TimestampCombiner.EARLIEST),
                     sdkComponents)
                 .toByteArray()));
-    addObject(spec, WorkerPropertyNames.INPUT_CODER, 
CloudObjects.asCloudObject(windowedKvCoder));
+    addObject(
+        spec,
+        WorkerPropertyNames.INPUT_CODER,
+        CloudObjects.asCloudObject(windowedKvCoder, /*sdkComponents=*/ null));
 
     ParallelInstruction mergeWindowsInstruction =
         new ParallelInstruction()
@@ -1208,7 +1219,9 @@ public void testMergeWindows() throws Exception {
                         .setOriginalName(DEFAULT_OUTPUT_ORIGINAL_NAME)
                         .setSystemName(DEFAULT_OUTPUT_SYSTEM_NAME)
                         .setName("output")
-                        
.setCodec(CloudObjects.asCloudObject(windowedGroupedCoder))));
+                        .setCodec(
+                            CloudObjects.asCloudObject(
+                                windowedGroupedCoder, /*sdkComponents=*/ 
null))));
 
     List<ParallelInstruction> instructions =
         Arrays.asList(
@@ -1492,7 +1505,10 @@ private void runMergeSessionsActions(List<Action> 
actions) throws Exception {
                         .withAllowedLateness(Duration.standardMinutes(60)),
                     sdkComponents)
                 .toByteArray()));
-    addObject(spec, WorkerPropertyNames.INPUT_CODER, 
CloudObjects.asCloudObject(windowedKvCoder));
+    addObject(
+        spec,
+        WorkerPropertyNames.INPUT_CODER,
+        CloudObjects.asCloudObject(windowedKvCoder, /*sdkComponents=*/ null));
 
     ParallelInstruction mergeWindowsInstruction =
         new ParallelInstruction()
@@ -1510,7 +1526,9 @@ private void runMergeSessionsActions(List<Action> 
actions) throws Exception {
                         .setOriginalName(DEFAULT_OUTPUT_ORIGINAL_NAME)
                         .setSystemName(DEFAULT_OUTPUT_SYSTEM_NAME)
                         .setName("output")
-                        
.setCodec(CloudObjects.asCloudObject(windowedGroupedCoder))));
+                        .setCodec(
+                            CloudObjects.asCloudObject(
+                                windowedGroupedCoder, /*sdkComponents=*/ 
null))));
 
     List<ParallelInstruction> instructions =
         Arrays.asList(
@@ -1650,7 +1668,8 @@ public void processElement(ProcessContext c) {
             WindowedValue.getFullCoder(
                 ValueWithRecordId.ValueWithRecordIdCoder.of(
                     KvCoder.of(VarIntCoder.of(), VarIntCoder.of())),
-                GlobalWindow.Coder.INSTANCE));
+                GlobalWindow.Coder.INSTANCE),
+            /*sdkComponents=*/ null);
 
     return Arrays.asList(
         new ParallelInstruction()
@@ -2074,7 +2093,8 @@ public void testExceptionInvalidatesCache() throws 
Exception {
               WindowedValue.getFullCoder(
                   ValueWithRecordId.ValueWithRecordIdCoder.of(
                       KvCoder.of(VarIntCoder.of(), VarIntCoder.of())),
-                  GlobalWindow.Coder.INSTANCE));
+                  GlobalWindow.Coder.INSTANCE),
+              /*sdkComponents=*/ null);
 
       TestCountingSource counter = new 
TestCountingSource(3).withThrowOnFirstSnapshot(true);
       List<ParallelInstruction> instructions =
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingPCollectionViewWriterDoFnFactoryTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingPCollectionViewWriterDoFnFactoryTest.java
index 9871ce06ebc..f920a235153 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingPCollectionViewWriterDoFnFactoryTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingPCollectionViewWriterDoFnFactoryTest.java
@@ -49,7 +49,8 @@ public void testConstruction() throws Exception {
 
     CloudObject coder =
         CloudObjects.asCloudObject(
-            WindowedValue.getFullCoder(BigEndianIntegerCoder.of(), 
GlobalWindow.Coder.INSTANCE));
+            WindowedValue.getFullCoder(BigEndianIntegerCoder.of(), 
GlobalWindow.Coder.INSTANCE),
+            /*sdkComponents=*/ null);
     ParDoFn parDoFn =
         new StreamingPCollectionViewWriterDoFnFactory()
             .create(
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java
index befc876e296..661e3941f9b 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java
@@ -105,9 +105,10 @@ public void setup() {
   @Test
   public void testLengthPrefixUnknownCoders() throws Exception {
     Map<String, Object> lengthPrefixedCoderCloudObject =
-        forCodec(CloudObjects.asCloudObject(windowedValueCoder), false);
+        forCodec(CloudObjects.asCloudObject(windowedValueCoder, 
/*sdkComponents=*/ null), false);
     assertEquals(
-        CloudObjects.asCloudObject(prefixedWindowedValueCoder), 
lengthPrefixedCoderCloudObject);
+        CloudObjects.asCloudObject(prefixedWindowedValueCoder, 
/*sdkComponents=*/ null),
+        lengthPrefixedCoderCloudObject);
   }
 
   /** Test bypassing unknown coders that are already wrapped with {@code 
LengthPrefixCoder} */
@@ -119,7 +120,7 @@ public void testLengthPrefixForLengthPrefixCoder() throws 
Exception {
             GlobalWindow.Coder.INSTANCE);
 
     Map<String, Object> lengthPrefixedCoderCloudObject =
-        forCodec(CloudObjects.asCloudObject(windowedValueCoder), false);
+        forCodec(CloudObjects.asCloudObject(windowedValueCoder, 
/*sdkComponents=*/ null), false);
 
     Coder<WindowedValue<KV<String, Integer>>> expectedCoder =
         WindowedValue.getFullCoder(
@@ -127,7 +128,9 @@ public void testLengthPrefixForLengthPrefixCoder() throws 
Exception {
                 LengthPrefixCoder.of(StringUtf8Coder.of()), 
LengthPrefixCoder.of(VarIntCoder.of())),
             GlobalWindow.Coder.INSTANCE);
 
-    assertEquals(CloudObjects.asCloudObject(expectedCoder), 
lengthPrefixedCoderCloudObject);
+    assertEquals(
+        CloudObjects.asCloudObject(expectedCoder, /*sdkComponents=*/ null),
+        lengthPrefixedCoderCloudObject);
   }
 
   /** Test replacing unknown coders with {@code LengthPrefixCoder<ByteArray>} 
*/
@@ -139,71 +142,81 @@ public void testLengthPrefixAndReplaceUnknownCoder() 
throws Exception {
             GlobalWindow.Coder.INSTANCE);
 
     Map<String, Object> lengthPrefixedCoderCloudObject =
-        forCodec(CloudObjects.asCloudObject(windowedValueCoder), true);
+        forCodec(CloudObjects.asCloudObject(windowedValueCoder, 
/*sdkComponents=*/ null), true);
 
     assertEquals(
-        CloudObjects.asCloudObject(prefixedAndReplacedWindowedValueCoder),
+        CloudObjects.asCloudObject(prefixedAndReplacedWindowedValueCoder, 
/*sdkComponents=*/ null),
         lengthPrefixedCoderCloudObject);
   }
 
   @Test
   public void testLengthPrefixInstructionOutputCoder() throws Exception {
     InstructionOutput output = new InstructionOutput();
-    output.setCodec(CloudObjects.asCloudObject(windowedValueCoder));
+    output.setCodec(CloudObjects.asCloudObject(windowedValueCoder, 
/*sdkComponents=*/ null));
     output.setFactory(new JacksonFactory());
 
     InstructionOutput prefixedOutput = forInstructionOutput(output, false);
-    assertEquals(CloudObjects.asCloudObject(prefixedWindowedValueCoder), 
prefixedOutput.getCodec());
+    assertEquals(
+        CloudObjects.asCloudObject(prefixedWindowedValueCoder, 
/*sdkComponents=*/ null),
+        prefixedOutput.getCodec());
     // Should not mutate the instruction.
-    assertEquals(output.getCodec(), 
CloudObjects.asCloudObject(windowedValueCoder));
+    assertEquals(
+        output.getCodec(), CloudObjects.asCloudObject(windowedValueCoder, 
/*sdkComponents=*/ null));
   }
 
   @Test
   public void testLengthPrefixReadInstructionCoder() throws Exception {
     ReadInstruction readInstruction = new ReadInstruction();
     readInstruction.setSource(
-        new Source().setCodec(CloudObjects.asCloudObject(windowedValueCoder)));
+        new Source()
+            .setCodec(CloudObjects.asCloudObject(windowedValueCoder, 
/*sdkComponents=*/ null)));
     instruction.setRead(readInstruction);
 
     ParallelInstruction prefixedInstruction = 
forParallelInstruction(instruction, false);
     assertEquals(
-        CloudObjects.asCloudObject(prefixedWindowedValueCoder),
+        CloudObjects.asCloudObject(prefixedWindowedValueCoder, 
/*sdkComponents=*/ null),
         prefixedInstruction.getRead().getSource().getCodec());
     // Should not mutate the instruction.
     assertEquals(
-        readInstruction.getSource().getCodec(), 
CloudObjects.asCloudObject(windowedValueCoder));
+        readInstruction.getSource().getCodec(),
+        CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ 
null));
   }
 
   @Test
   public void testLengthPrefixWriteInstructionCoder() throws Exception {
     WriteInstruction writeInstruction = new WriteInstruction();
-    writeInstruction.setSink(new 
Sink().setCodec(CloudObjects.asCloudObject(windowedValueCoder)));
+    writeInstruction.setSink(
+        new Sink()
+            .setCodec(CloudObjects.asCloudObject(windowedValueCoder, 
/*sdkComponents=*/ null)));
     instruction.setWrite(writeInstruction);
 
     ParallelInstruction prefixedInstruction = 
forParallelInstruction(instruction, false);
     assertEquals(
-        CloudObjects.asCloudObject(prefixedWindowedValueCoder),
+        CloudObjects.asCloudObject(prefixedWindowedValueCoder, 
/*sdkComponents=*/ null),
         prefixedInstruction.getWrite().getSink().getCodec());
     // Should not mutate the instruction.
     assertEquals(
-        CloudObjects.asCloudObject(windowedValueCoder), 
writeInstruction.getSink().getCodec());
+        CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ 
null),
+        writeInstruction.getSink().getCodec());
   }
 
   @Test
   public void testLengthPrefixParDoInstructionCoder() throws Exception {
     ParDoInstruction parDo = new ParDoInstruction();
     CloudObject spec = CloudObject.forClassName(MERGE_BUCKETS_DO_FN);
-    spec.put(WorkerPropertyNames.INPUT_CODER, 
CloudObjects.asCloudObject(windowedValueCoder));
+    spec.put(
+        WorkerPropertyNames.INPUT_CODER,
+        CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ 
null));
     parDo.setUserFn(spec);
     instruction.setParDo(parDo);
 
     ParallelInstruction prefixedInstruction = 
forParallelInstruction(instruction, false);
     assertEquals(
-        CloudObjects.asCloudObject(prefixedWindowedValueCoder),
+        CloudObjects.asCloudObject(prefixedWindowedValueCoder, 
/*sdkComponents=*/ null),
         
prefixedInstruction.getParDo().getUserFn().get(WorkerPropertyNames.INPUT_CODER));
     // Should not mutate the instruction.
     assertEquals(
-        CloudObjects.asCloudObject(windowedValueCoder),
+        CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ 
null),
         parDo.getUserFn().get(WorkerPropertyNames.INPUT_CODER));
   }
 
@@ -256,7 +269,7 @@ public void 
testLengthPrefixForInstructionOutputNodeWithGrpcNodeSuccessor() {
     network.addNode(grpcPortNode);
     network.addEdge(grpcPortNode, instructionOutputNode, DefaultEdge.create());
     assertEquals(
-        CloudObjects.asCloudObject(prefixedWindowedValueCoder),
+        CloudObjects.asCloudObject(prefixedWindowedValueCoder, 
/*sdkComponents=*/ null),
         ((InstructionOutputNode) 
forInstructionOutputNode(network).apply(instructionOutputNode))
             .getInstructionOutput()
             .getCodec());
@@ -269,7 +282,7 @@ public void 
testLengthPrefixForInstructionOutputNodeWithGrpcNodePredecessor() {
     network.addNode(grpcPortNode);
     network.addEdge(instructionOutputNode, grpcPortNode, DefaultEdge.create());
     assertEquals(
-        CloudObjects.asCloudObject(prefixedWindowedValueCoder),
+        CloudObjects.asCloudObject(prefixedWindowedValueCoder, 
/*sdkComponents=*/ null),
         ((InstructionOutputNode) 
forInstructionOutputNode(network).apply(instructionOutputNode))
             .getInstructionOutput()
             .getCodec());
@@ -283,7 +296,7 @@ public void 
testLengthPrefixForInstructionOutputNodeWithNonGrpcNodeNeighbor() {
     network.addNode(readNode);
     network.addEdge(readNode, instructionOutputNode, DefaultEdge.create());
     assertEquals(
-        CloudObjects.asCloudObject(windowedValueCoder),
+        CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ 
null),
         ((InstructionOutputNode) 
forInstructionOutputNode(network).apply(instructionOutputNode))
             .getInstructionOutput()
             .getCodec());
@@ -317,7 +330,8 @@ private static SideInputInfo 
createSideInputInfosWithCoders(Coder<?>... coders)
     SideInputInfo sideInputInfo = new SideInputInfo().setSources(new 
ArrayList<>());
     sideInputInfo.setFactory(new JacksonFactory());
     for (Coder<?> coder : coders) {
-      Source source = new Source().setCodec(CloudObjects.asCloudObject(coder));
+      Source source =
+          new Source().setCodec(CloudObjects.asCloudObject(coder, 
/*sdkComponents=*/ null));
       source.setFactory(new JacksonFactory());
       sideInputInfo.getSources().add(source);
     }
@@ -340,7 +354,7 @@ private static ParallelInstructionNode createReadNode(
                 new ReadInstruction()
                     .setSource(
                         new Source()
-                            .setCodec(CloudObjects.asCloudObject(coder))
+                            .setCodec(CloudObjects.asCloudObject(coder, 
/*sdkComponents=*/ null))
                             
.setSpec(CloudObject.forClassName(readClassName))));
 
     parallelInstruction.setFactory(new JacksonFactory());
@@ -349,7 +363,9 @@ private static ParallelInstructionNode createReadNode(
 
   private static InstructionOutputNode createInstructionOutputNode(String 
name, Coder<?> coder) {
     InstructionOutput instructionOutput =
-        new 
InstructionOutput().setName(name).setCodec(CloudObjects.asCloudObject(coder));
+        new InstructionOutput()
+            .setName(name)
+            .setCodec(CloudObjects.asCloudObject(coder, /*sdkComponents=*/ 
null));
     instructionOutput.setFactory(new JacksonFactory());
     return InstructionOutputNode.create(instructionOutput);
   }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/CloudSourceUtilsTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/CloudSourceUtilsTest.java
index 9814bb30329..43baf71b954 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/CloudSourceUtilsTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/CloudSourceUtilsTest.java
@@ -62,7 +62,7 @@ public void testFlattenBaseSpecs() throws Exception {
     source.getBaseSpecs().add(grandparent);
     source.getBaseSpecs().add(parent);
     source.setSpec(child);
-    source.setCodec(CloudObjects.asCloudObject(StringUtf8Coder.of()));
+    source.setCodec(CloudObjects.asCloudObject(StringUtf8Coder.of(), 
/*sdkComponents=*/ null));
 
     Source flat = CloudSourceUtils.flattenBaseSpecs(source);
     assertNull(flat.getBaseSpecs());
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/TimerOrElementTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/TimerOrElementTest.java
index a7387ca6b10..b4ae74ab3d3 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/TimerOrElementTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/TimerOrElementTest.java
@@ -55,7 +55,8 @@ public void testCoderCanBeDecodedFromCloudObject() {
             
"com.google.cloud.dataflow.sdk.util.TimerOrElement$TimerOrElementCoder");
     List<CloudObject> component =
         Collections.singletonList(
-            CloudObjects.asCloudObject(KvCoder.of(VarLongCoder.of(), 
ByteArrayCoder.of())));
+            CloudObjects.asCloudObject(
+                KvCoder.of(VarLongCoder.of(), ByteArrayCoder.of()), 
/*sdkComponents=*/ null));
     Structs.addList(cloudObject, PropertyNames.COMPONENT_ENCODINGS, component);
 
     Coder<?> decoded = CloudObjects.coderFromCloudObject(cloudObject);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

            Worklog Id:     (was: 166257)
            Time Spent: 40m  (was: 0.5h)
    Remaining Estimate: 167h 20m  (was: 167.5h)

> Dataflow runner should include portable pipeline coder id in CloudObject 
> coder representation
> ---------------------------------------------------------------------------------------------
>
>                 Key: BEAM-6067
>                 URL: https://issues.apache.org/jira/browse/BEAM-6067
>             Project: Beam
>          Issue Type: Improvement
>          Components: beam-model
>            Reporter: Craig Chambers
>            Assignee: Craig Chambers
>            Priority: Major
>   Original Estimate: 168h
>          Time Spent: 40m
>  Remaining Estimate: 167h 20m
>
> When translating a BeamJava Coder into the DataflowRunner's CloudObject 
> property map, include a property that specifies the id in the Beam model 
> Pipeline coders map corresponding to that Coder.  This will allow the 
> DataflowRunner to reference the corresponding Beam coder in the FnAPI 
> processing bundle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to