robertwb commented on code in PR #28210:
URL: https://github.com/apache/beam/pull/28210#discussion_r1327810238


##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreateTranslation.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction;
+
+import com.google.auto.service.AutoService;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
+import 
org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Create.Values;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.Row;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@SuppressWarnings({
+  "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
+  "nullness"
+})
+public class CreateTranslation implements 
TransformPayloadTranslator<Create.Values<?>> {
+
+  Schema createConfigSchema =
+      Schema.builder()
+          .addArrayField("values", FieldType.BYTES)
+          .addByteArrayField("serialized_coder")
+          .build();
+
+  @Override
+  public String getUrn() {
+    return PTransformTranslation.CREATE_TRANSFORM_URN;
+  }
+
+  @Override
+  public @Nullable FunctionSpec translate(
+      AppliedPTransform<?, ?, Values<?>> application, SdkComponents 
components) throws IOException {
+    // Currently just returns an empty payload.
+    // We can implement an actual payload of runners start using this 
transform.
+    return 
FunctionSpec.newBuilder().setUrn(getUrn(application.getTransform())).build();
+  }
+
+  private byte[] toByteArray(Object object) {
+    try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        ObjectOutputStream out = new ObjectOutputStream(bos)) {
+      out.writeObject(object);
+      return bos.toByteArray();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private Object fromByteArray(byte[] bytes) {
+    try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+        ObjectInputStream in = new ObjectInputStream(bis)) {
+      return in.readObject();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public @Nullable Row toConfigRow(Values<?> transform) {
+    List<byte[]> encodedElements = new ArrayList<>();
+    transform
+        .getElements()
+        .forEach(
+            object -> {
+              encodedElements.add(toByteArray(object));
+            });
+
+    byte[] serializedCoder =
+        transform.getCoder() != null ? toByteArray(transform.getCoder()) : new 
byte[] {};
+    return Row.withSchema(createConfigSchema)
+        .withFieldValue("values", encodedElements)
+        .withFieldValue("serialized_coder", serializedCoder)
+        .build();
+  }
+
+  @Override
+  public Create.@Nullable Values<?> fromConfigRow(Row configRow) {

Review Comment:
   Stray `@Nullable` (I don't think this should be nullable.)



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreateTranslation.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction;
+
+import com.google.auto.service.AutoService;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
+import 
org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Create.Values;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.Row;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@SuppressWarnings({
+  "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)

Review Comment:
   Can we avoid suppressing rawTypes (or at least make it more local if it 
can't be avoided)? 



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java:
##########
@@ -102,11 +121,370 @@ public void visitPrimitiveTransform(Node node) {
       // TODO(JIRA-5649): Don't even emit these transforms in the generated 
protos.
       res = elideDeprecatedViews(res);
     }
+
+    ExternalTranslationOptions externalTranslationOptions =
+        pipeline.getOptions().as(ExternalTranslationOptions.class);
+    List<String> urnsToOverride = 
externalTranslationOptions.getTransformsToOverride();
+    if (urnsToOverride.size() > 0) {
+      // We use PTransformPayloadTranslators and the Transform Service to 
re-generate the pipeline
+      // proto components for the updated transform and update the pipeline 
proto.
+      Map<String, PTransform> transforms = 
res.getComponents().getTransformsMap();
+      List<String> alreadyCheckedURns = new ArrayList<>();
+      for (Entry<String, PTransform> entry : transforms.entrySet()) {
+        String urn = entry.getValue().getSpec().getUrn();
+        if (!alreadyCheckedURns.contains(urn) && urnsToOverride.contains(urn)) 
{
+          alreadyCheckedURns.add(urn);

Review Comment:
   Won't this only override one transform of each type in the graph? (Seems 
worth a test too.)



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java:
##########
@@ -102,11 +121,370 @@ public void visitPrimitiveTransform(Node node) {
       // TODO(JIRA-5649): Don't even emit these transforms in the generated 
protos.
       res = elideDeprecatedViews(res);
     }
+
+    ExternalTranslationOptions externalTranslationOptions =
+        pipeline.getOptions().as(ExternalTranslationOptions.class);
+    List<String> urnsToOverride = 
externalTranslationOptions.getTransformsToOverride();
+    if (urnsToOverride.size() > 0) {
+      // We use PTransformPayloadTranslators and the Transform Service to 
re-generate the pipeline
+      // proto components for the updated transform and update the pipeline 
proto.
+      Map<String, PTransform> transforms = 
res.getComponents().getTransformsMap();
+      List<String> alreadyCheckedURns = new ArrayList<>();
+      for (Entry<String, PTransform> entry : transforms.entrySet()) {
+        String urn = entry.getValue().getSpec().getUrn();
+        if (!alreadyCheckedURns.contains(urn) && urnsToOverride.contains(urn)) 
{
+          alreadyCheckedURns.add(urn);
+          // All transforms in the pipeline with the given urns have to be 
overridden.
+          List<
+                  AppliedPTransform<
+                      PInput,
+                      POutput,
+                      org.apache.beam.sdk.transforms.PTransform<? super 
PInput, POutput>>>
+              appliedPTransforms =
+                  findAppliedPTransforms(
+                      urn, pipeline, 
KnownTransformPayloadTranslator.KNOWN_PAYLOAD_TRANSLATORS);
+          for (AppliedPTransform<
+                  PInput,
+                  POutput,
+                  org.apache.beam.sdk.transforms.PTransform<? super PInput, 
POutput>>
+              appliedPTransform : appliedPTransforms) {
+            TransformPayloadTranslator<
+                    org.apache.beam.sdk.transforms.PTransform<? super PInput, 
POutput>>
+                payloadTranslator =
+                    
KnownTransformPayloadTranslator.KNOWN_PAYLOAD_TRANSLATORS.get(
+                        appliedPTransform.getTransform().getClass());
+            try {
+              // Override the transform using the transform service.
+              res =
+                  updateTransformViaTransformService(
+                      urn, appliedPTransform, payloadTranslator, pipeline, 
res);
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          }
+        }
+      }
+    }
+
     // Validate that translation didn't produce an invalid pipeline.
     PipelineValidator.validate(res);
     return res;
   }
 
+  private static int findAvailablePort() throws IOException {
+    ServerSocket s = new ServerSocket(0);
+    try {
+      return s.getLocalPort();
+    } finally {
+      s.close();
+      try {
+        // Some systems don't free the port for future use immediately.
+        Thread.sleep(100);
+      } catch (InterruptedException exn) {
+        // ignore
+      }
+    }
+  }
+
+  // Override the given transform to the version available in a new transform 
service.
+  private static <
+          InputT extends PInput,
+          OutputT extends POutput,
+          TransformT extends org.apache.beam.sdk.transforms.PTransform<InputT, 
OutputT>>
+      RunnerApi.Pipeline updateTransformViaTransformService(
+          String urn,
+          AppliedPTransform<
+                  PInput,
+                  POutput,
+                  org.apache.beam.sdk.transforms.PTransform<? super PInput, 
POutput>>
+              appliedPTransform,
+          TransformPayloadTranslator<
+                  org.apache.beam.sdk.transforms.PTransform<? super PInput, 
POutput>>
+              originalPayloadTranslator,
+          Pipeline pipeline,
+          RunnerApi.Pipeline runnerAPIpipeline)
+          throws IOException {
+    ExternalTranslationOptions externalTranslationOptions =
+        pipeline.getOptions().as(ExternalTranslationOptions.class);
+
+    // Config row to re-construct the transform within the transform service.
+    Row configRow = 
originalPayloadTranslator.toConfigRow(appliedPTransform.getTransform());
+    ByteStringOutputStream outputStream = new ByteStringOutputStream();
+    try {
+      RowCoder.of(configRow.getSchema()).encode(configRow, outputStream);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    // Java expansion serivice able to identify and expand transforms that 
includes the construction
+    // config provided here.
+    ExternalTransforms.ExternalConfigurationPayload payload =
+        ExternalTransforms.ExternalConfigurationPayload.newBuilder()
+            .setSchema(SchemaTranslation.schemaToProto(configRow.getSchema(), 
true))
+            .setPayload(outputStream.toByteString())
+            .build();
+
+    String serviceAddress = null;
+    TransformServiceLauncher service = null;
+    try {
+      if (externalTranslationOptions.getTransformServiceAddress() != null) {
+        serviceAddress = 
externalTranslationOptions.getTransformServiceAddress();
+      } else if (externalTranslationOptions.getTransformServiceBeamVersion() 
!= null) {
+        String projectName = UUID.randomUUID().toString();
+        service = TransformServiceLauncher.forProject(projectName, 
findAvailablePort());
+        
service.setBeamVersion(externalTranslationOptions.getTransformServiceBeamVersion());
+
+        // Starting the transform service.
+        service.start();
+        // Waiting the service to be ready.
+        service.waitTillUp(15000);
+      } else {
+        throw new IllegalArgumentException(
+            "Either option TransformServiceAddress or option 
TransformServiceBeamVersion should be provided to override a transform using 
the transform service");
+      }
+
+      if (serviceAddress == null) {
+        throw new IllegalArgumentException(
+            "Cannot override the transform "
+                + urn
+                + " since a valid transform service address could not be 
determined");
+      }
+
+      // Creating an ExternalTransform and expanding it using the transform 
service.
+      // Input will be the same input provided to the transform bing 
overridden.
+      ExpandableTransform<InputT, OutputT> externalTransform =
+          (ExpandableTransform<InputT, OutputT>)
+              External.of(urn, payload.toByteArray(), serviceAddress);
+
+      PCollectionTuple input = PCollectionTuple.empty(pipeline);
+      for (TupleTag<?> tag : (Set<TupleTag<?>>) 
appliedPTransform.getInputs().keySet()) {
+        PCollection<?> pc = appliedPTransform.getInputs().get(tag);
+        if (pc == null) {
+          throw new IllegalArgumentException(
+              "Input of transform " + appliedPTransform + " with tag " + tag + 
" was null.");
+        }
+        input = input.and(tag, (PCollection) pc);
+      }
+      POutput output = externalTransform.expand((InputT) input);
+
+      // Outputs of the transform being overridden.
+      Map<TupleTag<?>, PCollection<?>> originalOutputs = 
appliedPTransform.getOutputs();
+
+      // After expansion some transforms might still refer to the output of 
the already overridden
+      // transform as their input.
+      // Such inputs have to be overridden to use the output of the new 
upgraded transform.
+      Map<String, String> inputReplacements = new HashMap<>();
+
+      // Will contain the outputs of the upgraded transform.
+      Map<TupleTag<?>, PCollection<?>> newOutputs = new HashMap<>();
+
+      if (output instanceof PCollectionTuple) {
+        newOutputs.putAll(((PCollectionTuple) output).getAll());
+        for (Map.Entry<TupleTag<?>, PCollection<?>> entry : 
newOutputs.entrySet()) {
+          if (entry == null) {
+            throw new IllegalArgumentException(
+                "Found unexpected null entry when iterating the outputs of 
expanded "
+                    + "ExpandableTransform "
+                    + externalTransform);
+          }
+          if (!appliedPTransform.getOutputs().containsKey(entry.getKey())) {
+            throw new RuntimeException(
+                "Could not find the tag " + entry.getKey() + " in the original 
set of outputs");
+          }
+          PCollection<?> originalOutputPc = 
originalOutputs.get(entry.getKey());
+          if (originalOutputPc == null) {
+            throw new IllegalArgumentException(
+                "Original output of transform "
+                    + appliedPTransform
+                    + " with tag "
+                    + entry.getKey()
+                    + " was null");
+          }
+          inputReplacements.put(originalOutputPc.getName(), 
entry.getValue().getName());
+        }
+      } else if (output instanceof PCollection) {
+        newOutputs.put(new TupleTag<>("temp_main_tag"), (PCollection) output);
+        inputReplacements.put(
+            
originalOutputs.get(originalOutputs.keySet().iterator().next()).getName(),
+            ((PCollection) output).getName());
+      } else {
+        throw new RuntimeException("Unexpected output type");
+      }
+
+      // We create a new AppliedPTransform to represent the upgraded transform 
and register it in an
+      // SdkComponents object.
+      AppliedPTransform<?, ?, ?> updatedAppliedPTransform =
+          AppliedPTransform.of(
+              appliedPTransform.getFullName() + "_external",
+              appliedPTransform.getInputs(),
+              newOutputs,
+              externalTransform,
+              externalTransform.getResourceHints(),
+              appliedPTransform.getPipeline());
+      SdkComponents updatedComponents =
+          SdkComponents.create(
+              runnerAPIpipeline.getComponents(), 
runnerAPIpipeline.getRequirementsList());
+      String updatedTransformId =
+          updatedComponents.registerPTransform(updatedAppliedPTransform, 
Collections.emptyList());
+      RunnerApi.Components updatedRunnerApiComponents = 
updatedComponents.toComponents();
+
+      // Recording input updates to the transforms to refer to the upgraded 
transform instead of the
+      // old one.
+      // Also recording the newly generated id of the old (overridden) 
transform in the
+      // updatedRunnerApiComponents.
+      Map<String, Map<String, String>> transformInputUpdates = new HashMap<>();
+      List<String> oldTransformIds = new ArrayList<>();
+      updatedRunnerApiComponents
+          .getTransformsMap()
+          .forEach(
+              (transformId, transform) -> {
+                // Mapping from existing key to new value.
+                Map<String, String> updatedInputMap = new HashMap<>();
+                for (Map.Entry<String, String> entry : 
transform.getInputsMap().entrySet()) {
+                  if (inputReplacements.containsKey(entry.getValue())) {
+                    updatedInputMap.put(entry.getKey(), 
inputReplacements.get(entry.getValue()));
+                  }
+                }
+                for (Map.Entry<String, String> entry : 
transform.getOutputsMap().entrySet()) {
+                  if (inputReplacements.containsKey(entry.getValue())
+                      && urn.equals(transform.getSpec().getUrn())) {
+                    oldTransformIds.add(transformId);
+                  }
+                }
+                if (updatedInputMap.size() > 0) {
+                  transformInputUpdates.put(transformId, updatedInputMap);
+                }
+              });
+      // There should be only one recorded old (upgraded) transform.
+      if (oldTransformIds.size() != 1) {
+        throw new IOException(
+            "Expected exactly one transform to be updated by "
+                + oldTransformIds.size()
+                + " were updated.");
+      }
+      String oldTransformId = oldTransformIds.get(0);
+
+      // Updated list of root transforms (in case a root was upgraded).
+      List<String> updaterRootTransformIds = new ArrayList<>();
+      
updaterRootTransformIds.addAll(runnerAPIpipeline.getRootTransformIdsList());
+      if (updaterRootTransformIds.contains(oldTransformId)) {
+        updaterRootTransformIds.remove(oldTransformId);
+        updaterRootTransformIds.add(updatedTransformId);

Review Comment:
   I suppose the order does not matter here?



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java:
##########
@@ -386,9 +388,9 @@ public RunnerApi.PTransform translate(
    * Translates a set of registered transforms whose content only differs 
based by differences in
    * their {@link FunctionSpec}s and URNs.
    */
-  private static class KnownTransformPayloadTranslator<T extends PTransform<?, 
?>>
+  public static class KnownTransformPayloadTranslator<T extends PTransform<?, 
?>>

Review Comment:
   Why do these have to be public?



##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java:
##########
@@ -2572,6 +2572,11 @@ public String getUrn(PTransform transform) {
       return "dataflow_stub:" + transform.getClass().getName();
     }
 
+    @Override
+    public String getUrn() {

Review Comment:
   Hmm... should this instead be protected (such that it won't accidentally get 
called? (The *only* caller should be the `getUrn(Transform)` variant, right?) 
   
   Is the default implementation raising an error insufficient? 



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreateTranslation.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction;
+
+import com.google.auto.service.AutoService;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
+import 
org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Create.Values;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.Row;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@SuppressWarnings({
+  "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
+  "nullness"
+})
+public class CreateTranslation implements 
TransformPayloadTranslator<Create.Values<?>> {
+
+  Schema createConfigSchema =
+      Schema.builder()
+          .addArrayField("values", FieldType.BYTES)
+          .addByteArrayField("serialized_coder")
+          .build();
+
+  @Override
+  public String getUrn() {
+    return PTransformTranslation.CREATE_TRANSFORM_URN;
+  }
+
+  @Override
+  public @Nullable FunctionSpec translate(
+      AppliedPTransform<?, ?, Values<?>> application, SdkComponents 
components) throws IOException {
+    // Currently just returns an empty payload.
+    // We can implement an actual payload of runners start using this 
transform.
+    return 
FunctionSpec.newBuilder().setUrn(getUrn(application.getTransform())).build();
+  }
+
+  private byte[] toByteArray(Object object) {
+    try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        ObjectOutputStream out = new ObjectOutputStream(bos)) {
+      out.writeObject(object);
+      return bos.toByteArray();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private Object fromByteArray(byte[] bytes) {
+    try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+        ObjectInputStream in = new ObjectInputStream(bis)) {
+      return in.readObject();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public @Nullable Row toConfigRow(Values<?> transform) {
+    List<byte[]> encodedElements = new ArrayList<>();
+    transform
+        .getElements()
+        .forEach(
+            object -> {
+              encodedElements.add(toByteArray(object));
+            });

Review Comment:
   If you're going to be using the new functional API, maybe use streams + 
collect to list? 



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreateTranslation.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction;
+
+import com.google.auto.service.AutoService;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
+import 
org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Create.Values;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.Row;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@SuppressWarnings({
+  "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
+  "nullness"
+})
+public class CreateTranslation implements 
TransformPayloadTranslator<Create.Values<?>> {
+
+  Schema createConfigSchema =
+      Schema.builder()
+          .addArrayField("values", FieldType.BYTES)
+          .addByteArrayField("serialized_coder")
+          .build();
+
+  @Override
+  public String getUrn() {
+    return PTransformTranslation.CREATE_TRANSFORM_URN;
+  }
+
+  @Override
+  public @Nullable FunctionSpec translate(
+      AppliedPTransform<?, ?, Values<?>> application, SdkComponents 
components) throws IOException {
+    // Currently just returns an empty payload.
+    // We can implement an actual payload of runners start using this 
transform.
+    return 
FunctionSpec.newBuilder().setUrn(getUrn(application.getTransform())).build();
+  }
+
+  private byte[] toByteArray(Object object) {
+    try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        ObjectOutputStream out = new ObjectOutputStream(bos)) {
+      out.writeObject(object);
+      return bos.toByteArray();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private Object fromByteArray(byte[] bytes) {
+    try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+        ObjectInputStream in = new ObjectInputStream(bis)) {
+      return in.readObject();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public @Nullable Row toConfigRow(Values<?> transform) {
+    List<byte[]> encodedElements = new ArrayList<>();
+    transform
+        .getElements()
+        .forEach(
+            object -> {
+              encodedElements.add(toByteArray(object));
+            });
+
+    byte[] serializedCoder =
+        transform.getCoder() != null ? toByteArray(transform.getCoder()) : new 
byte[] {};
+    return Row.withSchema(createConfigSchema)
+        .withFieldValue("values", encodedElements)
+        .withFieldValue("serialized_coder", serializedCoder)
+        .build();
+  }
+
+  @Override
+  public Create.@Nullable Values<?> fromConfigRow(Row configRow) {
+    Values transform =
+        Create.of(
+            configRow.getArray("values").stream()
+                .map(bytesValue -> fromByteArray((byte[]) bytesValue))
+                .collect(Collectors.toList()));
+    byte[] serializedCoder = configRow.getBytes("serialized_coder");

Review Comment:
   Would coder ever be null? Preferably we simply require it. 



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreateTranslation.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction;
+
+import com.google.auto.service.AutoService;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
+import 
org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Create.Values;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.Row;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@SuppressWarnings({
+  "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
+  "nullness"
+})
+public class CreateTranslation implements 
TransformPayloadTranslator<Create.Values<?>> {
+
+  Schema createConfigSchema =
+      Schema.builder()
+          .addArrayField("values", FieldType.BYTES)
+          .addByteArrayField("serialized_coder")
+          .build();
+
+  @Override
+  public String getUrn() {
+    return PTransformTranslation.CREATE_TRANSFORM_URN;
+  }
+
+  @Override
+  public @Nullable FunctionSpec translate(
+      AppliedPTransform<?, ?, Values<?>> application, SdkComponents 
components) throws IOException {
+    // Currently just returns an empty payload.
+    // We can implement an actual payload of runners start using this 
transform.
+    return 
FunctionSpec.newBuilder().setUrn(getUrn(application.getTransform())).build();
+  }
+
+  private byte[] toByteArray(Object object) {
+    try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        ObjectOutputStream out = new ObjectOutputStream(bos)) {
+      out.writeObject(object);
+      return bos.toByteArray();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private Object fromByteArray(byte[] bytes) {
+    try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+        ObjectInputStream in = new ObjectInputStream(bis)) {
+      return in.readObject();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public @Nullable Row toConfigRow(Values<?> transform) {

Review Comment:
   Shouldn't be nullable. 



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreateTranslation.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction;
+
+import com.google.auto.service.AutoService;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
+import 
org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Create.Values;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.Row;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@SuppressWarnings({
+  "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
+  "nullness"
+})
+public class CreateTranslation implements 
TransformPayloadTranslator<Create.Values<?>> {
+
+  Schema createConfigSchema =
+      Schema.builder()
+          .addArrayField("values", FieldType.BYTES)
+          .addByteArrayField("serialized_coder")
+          .build();
+
+  @Override
+  public String getUrn() {
+    return PTransformTranslation.CREATE_TRANSFORM_URN;
+  }
+
+  @Override
+  public @Nullable FunctionSpec translate(
+      AppliedPTransform<?, ?, Values<?>> application, SdkComponents 
components) throws IOException {
+    // Currently just returns an empty payload.
+    // We can implement an actual payload of runners start using this 
transform.
+    return 
FunctionSpec.newBuilder().setUrn(getUrn(application.getTransform())).build();
+  }
+
+  private byte[] toByteArray(Object object) {
+    try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        ObjectOutputStream out = new ObjectOutputStream(bos)) {
+      out.writeObject(object);
+      return bos.toByteArray();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private Object fromByteArray(byte[] bytes) {
+    try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+        ObjectInputStream in = new ObjectInputStream(bis)) {
+      return in.readObject();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public @Nullable Row toConfigRow(Values<?> transform) {
+    List<byte[]> encodedElements = new ArrayList<>();
+    transform
+        .getElements()
+        .forEach(
+            object -> {
+              encodedElements.add(toByteArray(object));
+            });
+
+    byte[] serializedCoder =
+        transform.getCoder() != null ? toByteArray(transform.getCoder()) : new 
byte[] {};
+    return Row.withSchema(createConfigSchema)
+        .withFieldValue("values", encodedElements)
+        .withFieldValue("serialized_coder", serializedCoder)
+        .build();
+  }
+
+  @Override
+  public Create.@Nullable Values<?> fromConfigRow(Row configRow) {
+    Values transform =
+        Create.of(

Review Comment:
   Is it legal to call Create.of(List<byte[]>) and then set the Coder to 
something other than a coder of byte[]? 



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreateTranslation.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction;
+
+import com.google.auto.service.AutoService;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
+import 
org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Create.Values;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.Row;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@SuppressWarnings({
+  "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
+  "nullness"
+})
+public class CreateTranslation implements 
TransformPayloadTranslator<Create.Values<?>> {
+
+  Schema createConfigSchema =
+      Schema.builder()
+          .addArrayField("values", FieldType.BYTES)
+          .addByteArrayField("serialized_coder")
+          .build();
+
+  @Override
+  public String getUrn() {
+    return PTransformTranslation.CREATE_TRANSFORM_URN;
+  }
+
+  @Override
+  public @Nullable FunctionSpec translate(
+      AppliedPTransform<?, ?, Values<?>> application, SdkComponents 
components) throws IOException {
+    // Currently just returns an empty payload.
+    // We can implement an actual payload of runners start using this 
transform.
+    return 
FunctionSpec.newBuilder().setUrn(getUrn(application.getTransform())).build();
+  }
+
+  private byte[] toByteArray(Object object) {
+    try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        ObjectOutputStream out = new ObjectOutputStream(bos)) {
+      out.writeObject(object);
+      return bos.toByteArray();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private Object fromByteArray(byte[] bytes) {
+    try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+        ObjectInputStream in = new ObjectInputStream(bis)) {
+      return in.readObject();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public @Nullable Row toConfigRow(Values<?> transform) {
+    List<byte[]> encodedElements = new ArrayList<>();
+    transform
+        .getElements()
+        .forEach(
+            object -> {
+              encodedElements.add(toByteArray(object));
+            });
+
+    byte[] serializedCoder =
+        transform.getCoder() != null ? toByteArray(transform.getCoder()) : new 
byte[] {};
+    return Row.withSchema(createConfigSchema)
+        .withFieldValue("values", encodedElements)
+        .withFieldValue("serialized_coder", serializedCoder)
+        .build();
+  }
+
+  @Override
+  public Create.@Nullable Values<?> fromConfigRow(Row configRow) {
+    Values transform =
+        Create.of(
+            configRow.getArray("values").stream()
+                .map(bytesValue -> fromByteArray((byte[]) bytesValue))
+                .collect(Collectors.toList()));
+    byte[] serializedCoder = configRow.getBytes("serialized_coder");
+    if (serializedCoder.length > 0) {
+      Coder coder = (Coder) fromByteArray(serializedCoder);
+      transform = transform.withCoder(coder);
+    }
+    return transform;
+  }
+
+  @AutoService(TransformPayloadTranslatorRegistrar.class)
+  public static class Registrar implements TransformPayloadTranslatorRegistrar 
{
+    @Override
+    public Map<? extends Class<? extends PTransform>, ? extends 
TransformPayloadTranslator>
+        getTransformPayloadTranslators() {
+      return Collections.singletonMap(Create.Values.class, new 
CreateTranslation());
+    }
+  }
+}

Review Comment:
   Not for this PR, but we should consider whether we can cut the boilerplate 
down to (much!) less than 100 lines...



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java:
##########
@@ -508,14 +510,56 @@ static RunnerApi.PTransform.Builder 
translateAppliedPTransform(
    *
    * <p>When going to a protocol buffer message, the translator produces a 
payload corresponding to
    * the Java representation while registering components that payload 
references.
+   *
+   * <p>Also, provides methods for generating a Row-based constructor config 
for the transform that
+   * can be later used to re-construct the transform.
    */
   public interface TransformPayloadTranslator<T extends PTransform<?, ?>> {
-    String getUrn(T transform);
 
+    /**
+     * Provides a unique URN for transforms represented by this {@code 
TransformPayloadTranslator}.
+     */
+    String getUrn();
+
+    /**
+     * Same as {@link #getUrn()} but the returned URN may depend on the 
transform provided.
+     *
+     * <p>Only override this if the same {@code TransformPayloadTranslator} 
used for multiple
+     * transforms. Otherwise, use {@link #getUrn()}.
+     */
+    default String getUrn(T transform) {
+      return getUrn();
+    }
+
+    /**
+     * Translates the given transform represented by the provided {@code 
AppliedPTransform} to a
+     * {@code FunctionSpec} with a URN and a payload.
+     */
     @Nullable

Review Comment:
   Document what a null return value means. 



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreateTranslation.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction;
+
+import com.google.auto.service.AutoService;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
+import 
org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Create.Values;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.Row;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@SuppressWarnings({
+  "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
+  "nullness"
+})
+public class CreateTranslation implements 
TransformPayloadTranslator<Create.Values<?>> {
+
+  Schema createConfigSchema =
+      Schema.builder()
+          .addArrayField("values", FieldType.BYTES)
+          .addByteArrayField("serialized_coder")
+          .build();
+
+  @Override
+  public String getUrn() {
+    return PTransformTranslation.CREATE_TRANSFORM_URN;
+  }
+
+  @Override
+  public @Nullable FunctionSpec translate(
+      AppliedPTransform<?, ?, Values<?>> application, SdkComponents 
components) throws IOException {
+    // Currently just returns an empty payload.
+    // We can implement an actual payload of runners start using this 
transform.
+    return 
FunctionSpec.newBuilder().setUrn(getUrn(application.getTransform())).build();
+  }
+
+  private byte[] toByteArray(Object object) {
+    try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        ObjectOutputStream out = new ObjectOutputStream(bos)) {
+      out.writeObject(object);
+      return bos.toByteArray();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private Object fromByteArray(byte[] bytes) {
+    try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+        ObjectInputStream in = new ObjectInputStream(bis)) {
+      return in.readObject();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public @Nullable Row toConfigRow(Values<?> transform) {
+    List<byte[]> encodedElements = new ArrayList<>();
+    transform
+        .getElements()
+        .forEach(
+            object -> {
+              encodedElements.add(toByteArray(object));
+            });
+
+    byte[] serializedCoder =

Review Comment:
   It'd be preferable to use the portable rather than java-serialized 
representation of this Coder. 



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java:
##########
@@ -102,11 +121,370 @@ public void visitPrimitiveTransform(Node node) {
       // TODO(JIRA-5649): Don't even emit these transforms in the generated 
protos.
       res = elideDeprecatedViews(res);
     }
+
+    ExternalTranslationOptions externalTranslationOptions =

Review Comment:
   At the very least, let's refactor this all into its own method, even if we 
still call it from `toProto()`. Maybe even put all this overriding 
functionality in its own helper class and just invoke it here. 



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java:
##########
@@ -102,11 +121,370 @@ public void visitPrimitiveTransform(Node node) {
       // TODO(JIRA-5649): Don't even emit these transforms in the generated 
protos.
       res = elideDeprecatedViews(res);
     }
+
+    ExternalTranslationOptions externalTranslationOptions =
+        pipeline.getOptions().as(ExternalTranslationOptions.class);
+    List<String> urnsToOverride = 
externalTranslationOptions.getTransformsToOverride();
+    if (urnsToOverride.size() > 0) {
+      // We use PTransformPayloadTranslators and the Transform Service to 
re-generate the pipeline
+      // proto components for the updated transform and update the pipeline 
proto.
+      Map<String, PTransform> transforms = 
res.getComponents().getTransformsMap();
+      List<String> alreadyCheckedURns = new ArrayList<>();
+      for (Entry<String, PTransform> entry : transforms.entrySet()) {
+        String urn = entry.getValue().getSpec().getUrn();
+        if (!alreadyCheckedURns.contains(urn) && urnsToOverride.contains(urn)) 
{
+          alreadyCheckedURns.add(urn);
+          // All transforms in the pipeline with the given urns have to be 
overridden.
+          List<
+                  AppliedPTransform<
+                      PInput,
+                      POutput,
+                      org.apache.beam.sdk.transforms.PTransform<? super 
PInput, POutput>>>
+              appliedPTransforms =
+                  findAppliedPTransforms(
+                      urn, pipeline, 
KnownTransformPayloadTranslator.KNOWN_PAYLOAD_TRANSLATORS);
+          for (AppliedPTransform<
+                  PInput,
+                  POutput,
+                  org.apache.beam.sdk.transforms.PTransform<? super PInput, 
POutput>>
+              appliedPTransform : appliedPTransforms) {
+            TransformPayloadTranslator<
+                    org.apache.beam.sdk.transforms.PTransform<? super PInput, 
POutput>>
+                payloadTranslator =
+                    
KnownTransformPayloadTranslator.KNOWN_PAYLOAD_TRANSLATORS.get(
+                        appliedPTransform.getTransform().getClass());
+            try {
+              // Override the transform using the transform service.
+              res =
+                  updateTransformViaTransformService(
+                      urn, appliedPTransform, payloadTranslator, pipeline, 
res);
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          }
+        }
+      }
+    }
+
     // Validate that translation didn't produce an invalid pipeline.
     PipelineValidator.validate(res);
     return res;
   }
 
+  private static int findAvailablePort() throws IOException {
+    ServerSocket s = new ServerSocket(0);
+    try {
+      return s.getLocalPort();
+    } finally {
+      s.close();
+      try {
+        // Some systems don't free the port for future use immediately.
+        Thread.sleep(100);
+      } catch (InterruptedException exn) {
+        // ignore
+      }
+    }
+  }
+
+  // Override the given transform to the version available in a new transform 
service.
+  private static <
+          InputT extends PInput,
+          OutputT extends POutput,
+          TransformT extends org.apache.beam.sdk.transforms.PTransform<InputT, 
OutputT>>
+      RunnerApi.Pipeline updateTransformViaTransformService(
+          String urn,
+          AppliedPTransform<
+                  PInput,
+                  POutput,
+                  org.apache.beam.sdk.transforms.PTransform<? super PInput, 
POutput>>
+              appliedPTransform,
+          TransformPayloadTranslator<
+                  org.apache.beam.sdk.transforms.PTransform<? super PInput, 
POutput>>
+              originalPayloadTranslator,
+          Pipeline pipeline,
+          RunnerApi.Pipeline runnerAPIpipeline)
+          throws IOException {
+    ExternalTranslationOptions externalTranslationOptions =
+        pipeline.getOptions().as(ExternalTranslationOptions.class);
+
+    // Config row to re-construct the transform within the transform service.
+    Row configRow = 
originalPayloadTranslator.toConfigRow(appliedPTransform.getTransform());
+    ByteStringOutputStream outputStream = new ByteStringOutputStream();
+    try {
+      RowCoder.of(configRow.getSchema()).encode(configRow, outputStream);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    // Java expansion serivice able to identify and expand transforms that 
includes the construction
+    // config provided here.
+    ExternalTransforms.ExternalConfigurationPayload payload =
+        ExternalTransforms.ExternalConfigurationPayload.newBuilder()
+            .setSchema(SchemaTranslation.schemaToProto(configRow.getSchema(), 
true))
+            .setPayload(outputStream.toByteString())
+            .build();
+
+    String serviceAddress = null;

Review Comment:
   Start and connect to the service once and pass it in as an option rather 
than doing so for every transform individually. 



##########
sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java:
##########
@@ -178,6 +184,65 @@ public List<String> getDependencies(
         }
       }
 
+      List<String> deprecatedTransformURNs = 
ImmutableList.of(READ_TRANSFORM_URN);
+      for (TransformPayloadTranslatorRegistrar registrar :
+          ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
+        for (Map.Entry<? extends Class<? extends PTransform>, ? extends 
TransformPayloadTranslator>
+            entry : registrar.getTransformPayloadTranslators().entrySet()) {
+          @Initialized TransformPayloadTranslator translator = 
entry.getValue();
+          if (translator == null) {
+            continue;
+          }
+
+          String urn = null;

Review Comment:
   I'm still seeing it.



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java:
##########
@@ -102,11 +121,370 @@ public void visitPrimitiveTransform(Node node) {
       // TODO(JIRA-5649): Don't even emit these transforms in the generated 
protos.
       res = elideDeprecatedViews(res);
     }
+
+    ExternalTranslationOptions externalTranslationOptions =
+        pipeline.getOptions().as(ExternalTranslationOptions.class);
+    List<String> urnsToOverride = 
externalTranslationOptions.getTransformsToOverride();
+    if (urnsToOverride.size() > 0) {
+      // We use PTransformPayloadTranslators and the Transform Service to 
re-generate the pipeline
+      // proto components for the updated transform and update the pipeline 
proto.
+      Map<String, PTransform> transforms = 
res.getComponents().getTransformsMap();
+      List<String> alreadyCheckedURns = new ArrayList<>();
+      for (Entry<String, PTransform> entry : transforms.entrySet()) {
+        String urn = entry.getValue().getSpec().getUrn();
+        if (!alreadyCheckedURns.contains(urn) && urnsToOverride.contains(urn)) 
{
+          alreadyCheckedURns.add(urn);
+          // All transforms in the pipeline with the given urns have to be 
overridden.
+          List<
+                  AppliedPTransform<
+                      PInput,
+                      POutput,
+                      org.apache.beam.sdk.transforms.PTransform<? super 
PInput, POutput>>>
+              appliedPTransforms =
+                  findAppliedPTransforms(
+                      urn, pipeline, 
KnownTransformPayloadTranslator.KNOWN_PAYLOAD_TRANSLATORS);
+          for (AppliedPTransform<
+                  PInput,
+                  POutput,
+                  org.apache.beam.sdk.transforms.PTransform<? super PInput, 
POutput>>
+              appliedPTransform : appliedPTransforms) {
+            TransformPayloadTranslator<
+                    org.apache.beam.sdk.transforms.PTransform<? super PInput, 
POutput>>
+                payloadTranslator =
+                    
KnownTransformPayloadTranslator.KNOWN_PAYLOAD_TRANSLATORS.get(
+                        appliedPTransform.getTransform().getClass());
+            try {
+              // Override the transform using the transform service.
+              res =
+                  updateTransformViaTransformService(
+                      urn, appliedPTransform, payloadTranslator, pipeline, 
res);
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          }
+        }
+      }
+    }
+
     // Validate that translation didn't produce an invalid pipeline.
     PipelineValidator.validate(res);
     return res;
   }
 
+  private static int findAvailablePort() throws IOException {
+    ServerSocket s = new ServerSocket(0);
+    try {
+      return s.getLocalPort();
+    } finally {
+      s.close();
+      try {
+        // Some systems don't free the port for future use immediately.
+        Thread.sleep(100);
+      } catch (InterruptedException exn) {
+        // ignore
+      }
+    }
+  }
+
+  // Override the given transform to the version available in a new transform 
service.
+  private static <
+          InputT extends PInput,
+          OutputT extends POutput,
+          TransformT extends org.apache.beam.sdk.transforms.PTransform<InputT, 
OutputT>>
+      RunnerApi.Pipeline updateTransformViaTransformService(
+          String urn,
+          AppliedPTransform<
+                  PInput,
+                  POutput,
+                  org.apache.beam.sdk.transforms.PTransform<? super PInput, 
POutput>>
+              appliedPTransform,
+          TransformPayloadTranslator<
+                  org.apache.beam.sdk.transforms.PTransform<? super PInput, 
POutput>>
+              originalPayloadTranslator,
+          Pipeline pipeline,
+          RunnerApi.Pipeline runnerAPIpipeline)
+          throws IOException {
+    ExternalTranslationOptions externalTranslationOptions =
+        pipeline.getOptions().as(ExternalTranslationOptions.class);
+
+    // Config row to re-construct the transform within the transform service.
+    Row configRow = 
originalPayloadTranslator.toConfigRow(appliedPTransform.getTransform());
+    ByteStringOutputStream outputStream = new ByteStringOutputStream();
+    try {
+      RowCoder.of(configRow.getSchema()).encode(configRow, outputStream);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    // Java expansion serivice able to identify and expand transforms that 
includes the construction
+    // config provided here.
+    ExternalTransforms.ExternalConfigurationPayload payload =
+        ExternalTransforms.ExternalConfigurationPayload.newBuilder()
+            .setSchema(SchemaTranslation.schemaToProto(configRow.getSchema(), 
true))
+            .setPayload(outputStream.toByteString())
+            .build();
+
+    String serviceAddress = null;
+    TransformServiceLauncher service = null;
+    try {
+      if (externalTranslationOptions.getTransformServiceAddress() != null) {
+        serviceAddress = 
externalTranslationOptions.getTransformServiceAddress();
+      } else if (externalTranslationOptions.getTransformServiceBeamVersion() 
!= null) {
+        String projectName = UUID.randomUUID().toString();
+        service = TransformServiceLauncher.forProject(projectName, 
findAvailablePort());
+        
service.setBeamVersion(externalTranslationOptions.getTransformServiceBeamVersion());
+
+        // Starting the transform service.
+        service.start();
+        // Waiting the service to be ready.
+        service.waitTillUp(15000);
+      } else {
+        throw new IllegalArgumentException(
+            "Either option TransformServiceAddress or option 
TransformServiceBeamVersion should be provided to override a transform using 
the transform service");
+      }
+
+      if (serviceAddress == null) {
+        throw new IllegalArgumentException(
+            "Cannot override the transform "
+                + urn
+                + " since a valid transform service address could not be 
determined");
+      }
+
+      // Creating an ExternalTransform and expanding it using the transform 
service.
+      // Input will be the same input provided to the transform bing 
overridden.
+      ExpandableTransform<InputT, OutputT> externalTransform =
+          (ExpandableTransform<InputT, OutputT>)
+              External.of(urn, payload.toByteArray(), serviceAddress);
+
+      PCollectionTuple input = PCollectionTuple.empty(pipeline);
+      for (TupleTag<?> tag : (Set<TupleTag<?>>) 
appliedPTransform.getInputs().keySet()) {
+        PCollection<?> pc = appliedPTransform.getInputs().get(tag);
+        if (pc == null) {
+          throw new IllegalArgumentException(
+              "Input of transform " + appliedPTransform + " with tag " + tag + 
" was null.");
+        }
+        input = input.and(tag, (PCollection) pc);
+      }
+      POutput output = externalTransform.expand((InputT) input);
+
+      // Outputs of the transform being overridden.
+      Map<TupleTag<?>, PCollection<?>> originalOutputs = 
appliedPTransform.getOutputs();
+
+      // After expansion some transforms might still refer to the output of 
the already overridden
+      // transform as their input.
+      // Such inputs have to be overridden to use the output of the new 
upgraded transform.
+      Map<String, String> inputReplacements = new HashMap<>();
+
+      // Will contain the outputs of the upgraded transform.
+      Map<TupleTag<?>, PCollection<?>> newOutputs = new HashMap<>();
+
+      if (output instanceof PCollectionTuple) {
+        newOutputs.putAll(((PCollectionTuple) output).getAll());
+        for (Map.Entry<TupleTag<?>, PCollection<?>> entry : 
newOutputs.entrySet()) {
+          if (entry == null) {
+            throw new IllegalArgumentException(
+                "Found unexpected null entry when iterating the outputs of 
expanded "
+                    + "ExpandableTransform "
+                    + externalTransform);
+          }
+          if (!appliedPTransform.getOutputs().containsKey(entry.getKey())) {
+            throw new RuntimeException(
+                "Could not find the tag " + entry.getKey() + " in the original 
set of outputs");
+          }
+          PCollection<?> originalOutputPc = 
originalOutputs.get(entry.getKey());
+          if (originalOutputPc == null) {
+            throw new IllegalArgumentException(
+                "Original output of transform "
+                    + appliedPTransform
+                    + " with tag "
+                    + entry.getKey()
+                    + " was null");
+          }
+          inputReplacements.put(originalOutputPc.getName(), 
entry.getValue().getName());
+        }
+      } else if (output instanceof PCollection) {
+        newOutputs.put(new TupleTag<>("temp_main_tag"), (PCollection) output);
+        inputReplacements.put(
+            
originalOutputs.get(originalOutputs.keySet().iterator().next()).getName(),
+            ((PCollection) output).getName());
+      } else {
+        throw new RuntimeException("Unexpected output type");
+      }
+
+      // We create a new AppliedPTransform to represent the upgraded transform 
and register it in an
+      // SdkComponents object.
+      AppliedPTransform<?, ?, ?> updatedAppliedPTransform =
+          AppliedPTransform.of(
+              appliedPTransform.getFullName() + "_external",
+              appliedPTransform.getInputs(),
+              newOutputs,
+              externalTransform,
+              externalTransform.getResourceHints(),
+              appliedPTransform.getPipeline());
+      SdkComponents updatedComponents =
+          SdkComponents.create(
+              runnerAPIpipeline.getComponents(), 
runnerAPIpipeline.getRequirementsList());
+      String updatedTransformId =
+          updatedComponents.registerPTransform(updatedAppliedPTransform, 
Collections.emptyList());

Review Comment:
   `Collections.emptyList()` doesn't seem correct if this is a composite 
transform. 



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java:
##########
@@ -102,11 +121,370 @@ public void visitPrimitiveTransform(Node node) {
       // TODO(JIRA-5649): Don't even emit these transforms in the generated 
protos.
       res = elideDeprecatedViews(res);
     }
+
+    ExternalTranslationOptions externalTranslationOptions =
+        pipeline.getOptions().as(ExternalTranslationOptions.class);
+    List<String> urnsToOverride = 
externalTranslationOptions.getTransformsToOverride();
+    if (urnsToOverride.size() > 0) {
+      // We use PTransformPayloadTranslators and the Transform Service to 
re-generate the pipeline
+      // proto components for the updated transform and update the pipeline 
proto.
+      Map<String, PTransform> transforms = 
res.getComponents().getTransformsMap();
+      List<String> alreadyCheckedURns = new ArrayList<>();
+      for (Entry<String, PTransform> entry : transforms.entrySet()) {
+        String urn = entry.getValue().getSpec().getUrn();
+        if (!alreadyCheckedURns.contains(urn) && urnsToOverride.contains(urn)) 
{
+          alreadyCheckedURns.add(urn);
+          // All transforms in the pipeline with the given urns have to be 
overridden.
+          List<
+                  AppliedPTransform<
+                      PInput,
+                      POutput,
+                      org.apache.beam.sdk.transforms.PTransform<? super 
PInput, POutput>>>
+              appliedPTransforms =
+                  findAppliedPTransforms(
+                      urn, pipeline, 
KnownTransformPayloadTranslator.KNOWN_PAYLOAD_TRANSLATORS);
+          for (AppliedPTransform<
+                  PInput,
+                  POutput,
+                  org.apache.beam.sdk.transforms.PTransform<? super PInput, 
POutput>>
+              appliedPTransform : appliedPTransforms) {
+            TransformPayloadTranslator<
+                    org.apache.beam.sdk.transforms.PTransform<? super PInput, 
POutput>>
+                payloadTranslator =
+                    
KnownTransformPayloadTranslator.KNOWN_PAYLOAD_TRANSLATORS.get(
+                        appliedPTransform.getTransform().getClass());
+            try {
+              // Override the transform using the transform service.
+              res =
+                  updateTransformViaTransformService(
+                      urn, appliedPTransform, payloadTranslator, pipeline, 
res);
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          }
+        }
+      }
+    }
+
     // Validate that translation didn't produce an invalid pipeline.
     PipelineValidator.validate(res);
     return res;
   }
 
+  private static int findAvailablePort() throws IOException {
+    ServerSocket s = new ServerSocket(0);
+    try {
+      return s.getLocalPort();
+    } finally {
+      s.close();
+      try {
+        // Some systems don't free the port for future use immediately.
+        Thread.sleep(100);
+      } catch (InterruptedException exn) {
+        // ignore
+      }
+    }
+  }
+
+  // Override the given transform to the version available in a new transform 
service.
+  private static <
+          InputT extends PInput,
+          OutputT extends POutput,
+          TransformT extends org.apache.beam.sdk.transforms.PTransform<InputT, 
OutputT>>
+      RunnerApi.Pipeline updateTransformViaTransformService(
+          String urn,
+          AppliedPTransform<
+                  PInput,
+                  POutput,
+                  org.apache.beam.sdk.transforms.PTransform<? super PInput, 
POutput>>
+              appliedPTransform,
+          TransformPayloadTranslator<
+                  org.apache.beam.sdk.transforms.PTransform<? super PInput, 
POutput>>
+              originalPayloadTranslator,
+          Pipeline pipeline,
+          RunnerApi.Pipeline runnerAPIpipeline)
+          throws IOException {
+    ExternalTranslationOptions externalTranslationOptions =
+        pipeline.getOptions().as(ExternalTranslationOptions.class);
+
+    // Config row to re-construct the transform within the transform service.
+    Row configRow = 
originalPayloadTranslator.toConfigRow(appliedPTransform.getTransform());
+    ByteStringOutputStream outputStream = new ByteStringOutputStream();
+    try {
+      RowCoder.of(configRow.getSchema()).encode(configRow, outputStream);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    // Java expansion serivice able to identify and expand transforms that 
includes the construction
+    // config provided here.
+    ExternalTransforms.ExternalConfigurationPayload payload =
+        ExternalTransforms.ExternalConfigurationPayload.newBuilder()
+            .setSchema(SchemaTranslation.schemaToProto(configRow.getSchema(), 
true))
+            .setPayload(outputStream.toByteString())
+            .build();
+
+    String serviceAddress = null;
+    TransformServiceLauncher service = null;
+    try {
+      if (externalTranslationOptions.getTransformServiceAddress() != null) {
+        serviceAddress = 
externalTranslationOptions.getTransformServiceAddress();
+      } else if (externalTranslationOptions.getTransformServiceBeamVersion() 
!= null) {
+        String projectName = UUID.randomUUID().toString();
+        service = TransformServiceLauncher.forProject(projectName, 
findAvailablePort());
+        
service.setBeamVersion(externalTranslationOptions.getTransformServiceBeamVersion());
+
+        // Starting the transform service.
+        service.start();
+        // Waiting the service to be ready.
+        service.waitTillUp(15000);
+      } else {
+        throw new IllegalArgumentException(
+            "Either option TransformServiceAddress or option 
TransformServiceBeamVersion should be provided to override a transform using 
the transform service");
+      }
+
+      if (serviceAddress == null) {
+        throw new IllegalArgumentException(
+            "Cannot override the transform "
+                + urn
+                + " since a valid transform service address could not be 
determined");
+      }
+
+      // Creating an ExternalTransform and expanding it using the transform 
service.
+      // Input will be the same input provided to the transform bing 
overridden.
+      ExpandableTransform<InputT, OutputT> externalTransform =
+          (ExpandableTransform<InputT, OutputT>)
+              External.of(urn, payload.toByteArray(), serviceAddress);
+
+      PCollectionTuple input = PCollectionTuple.empty(pipeline);
+      for (TupleTag<?> tag : (Set<TupleTag<?>>) 
appliedPTransform.getInputs().keySet()) {
+        PCollection<?> pc = appliedPTransform.getInputs().get(tag);
+        if (pc == null) {
+          throw new IllegalArgumentException(
+              "Input of transform " + appliedPTransform + " with tag " + tag + 
" was null.");
+        }
+        input = input.and(tag, (PCollection) pc);
+      }
+      POutput output = externalTransform.expand((InputT) input);
+
+      // Outputs of the transform being overridden.
+      Map<TupleTag<?>, PCollection<?>> originalOutputs = 
appliedPTransform.getOutputs();
+
+      // After expansion some transforms might still refer to the output of 
the already overridden
+      // transform as their input.
+      // Such inputs have to be overridden to use the output of the new 
upgraded transform.
+      Map<String, String> inputReplacements = new HashMap<>();
+
+      // Will contain the outputs of the upgraded transform.
+      Map<TupleTag<?>, PCollection<?>> newOutputs = new HashMap<>();
+
+      if (output instanceof PCollectionTuple) {
+        newOutputs.putAll(((PCollectionTuple) output).getAll());
+        for (Map.Entry<TupleTag<?>, PCollection<?>> entry : 
newOutputs.entrySet()) {
+          if (entry == null) {
+            throw new IllegalArgumentException(
+                "Found unexpected null entry when iterating the outputs of 
expanded "
+                    + "ExpandableTransform "
+                    + externalTransform);
+          }
+          if (!appliedPTransform.getOutputs().containsKey(entry.getKey())) {
+            throw new RuntimeException(
+                "Could not find the tag " + entry.getKey() + " in the original 
set of outputs");
+          }
+          PCollection<?> originalOutputPc = 
originalOutputs.get(entry.getKey());
+          if (originalOutputPc == null) {
+            throw new IllegalArgumentException(
+                "Original output of transform "
+                    + appliedPTransform
+                    + " with tag "
+                    + entry.getKey()
+                    + " was null");
+          }
+          inputReplacements.put(originalOutputPc.getName(), 
entry.getValue().getName());
+        }
+      } else if (output instanceof PCollection) {
+        newOutputs.put(new TupleTag<>("temp_main_tag"), (PCollection) output);
+        inputReplacements.put(
+            
originalOutputs.get(originalOutputs.keySet().iterator().next()).getName(),
+            ((PCollection) output).getName());
+      } else {
+        throw new RuntimeException("Unexpected output type");
+      }
+
+      // We create a new AppliedPTransform to represent the upgraded transform 
and register it in an
+      // SdkComponents object.
+      AppliedPTransform<?, ?, ?> updatedAppliedPTransform =
+          AppliedPTransform.of(
+              appliedPTransform.getFullName() + "_external",
+              appliedPTransform.getInputs(),
+              newOutputs,
+              externalTransform,
+              externalTransform.getResourceHints(),
+              appliedPTransform.getPipeline());
+      SdkComponents updatedComponents =
+          SdkComponents.create(
+              runnerAPIpipeline.getComponents(), 
runnerAPIpipeline.getRequirementsList());
+      String updatedTransformId =
+          updatedComponents.registerPTransform(updatedAppliedPTransform, 
Collections.emptyList());
+      RunnerApi.Components updatedRunnerApiComponents = 
updatedComponents.toComponents();
+
+      // Recording input updates to the transforms to refer to the upgraded 
transform instead of the
+      // old one.
+      // Also recording the newly generated id of the old (overridden) 
transform in the
+      // updatedRunnerApiComponents.
+      Map<String, Map<String, String>> transformInputUpdates = new HashMap<>();

Review Comment:
   Could you document what `transformInputUpdates` contains? String isn't very 
descriptive. 



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java:
##########
@@ -102,11 +121,370 @@ public void visitPrimitiveTransform(Node node) {
       // TODO(JIRA-5649): Don't even emit these transforms in the generated 
protos.
       res = elideDeprecatedViews(res);
     }
+
+    ExternalTranslationOptions externalTranslationOptions =
+        pipeline.getOptions().as(ExternalTranslationOptions.class);
+    List<String> urnsToOverride = 
externalTranslationOptions.getTransformsToOverride();
+    if (urnsToOverride.size() > 0) {
+      // We use PTransformPayloadTranslators and the Transform Service to 
re-generate the pipeline
+      // proto components for the updated transform and update the pipeline 
proto.
+      Map<String, PTransform> transforms = 
res.getComponents().getTransformsMap();
+      List<String> alreadyCheckedURns = new ArrayList<>();
+      for (Entry<String, PTransform> entry : transforms.entrySet()) {
+        String urn = entry.getValue().getSpec().getUrn();
+        if (!alreadyCheckedURns.contains(urn) && urnsToOverride.contains(urn)) 
{
+          alreadyCheckedURns.add(urn);
+          // All transforms in the pipeline with the given urns have to be 
overridden.
+          List<
+                  AppliedPTransform<
+                      PInput,
+                      POutput,
+                      org.apache.beam.sdk.transforms.PTransform<? super 
PInput, POutput>>>
+              appliedPTransforms =
+                  findAppliedPTransforms(
+                      urn, pipeline, 
KnownTransformPayloadTranslator.KNOWN_PAYLOAD_TRANSLATORS);
+          for (AppliedPTransform<
+                  PInput,
+                  POutput,
+                  org.apache.beam.sdk.transforms.PTransform<? super PInput, 
POutput>>
+              appliedPTransform : appliedPTransforms) {
+            TransformPayloadTranslator<

Review Comment:
   Rather than re-look this up, I still think it'd be cleaner to attach the Row 
to the PTransform itself during translation (at the same time you're attaching 
the URN and doing the translation, so you already have a PayloadTranslator in 
hand), and then just walk over the graph looking for URNs that you need to swap 
out. 
   
   You can stick the config row (and its schema) in 
https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L207



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java:
##########
@@ -102,11 +121,370 @@ public void visitPrimitiveTransform(Node node) {
       // TODO(JIRA-5649): Don't even emit these transforms in the generated 
protos.
       res = elideDeprecatedViews(res);
     }
+
+    ExternalTranslationOptions externalTranslationOptions =
+        pipeline.getOptions().as(ExternalTranslationOptions.class);
+    List<String> urnsToOverride = 
externalTranslationOptions.getTransformsToOverride();
+    if (urnsToOverride.size() > 0) {
+      // We use PTransformPayloadTranslators and the Transform Service to 
re-generate the pipeline
+      // proto components for the updated transform and update the pipeline 
proto.
+      Map<String, PTransform> transforms = 
res.getComponents().getTransformsMap();
+      List<String> alreadyCheckedURns = new ArrayList<>();
+      for (Entry<String, PTransform> entry : transforms.entrySet()) {
+        String urn = entry.getValue().getSpec().getUrn();
+        if (!alreadyCheckedURns.contains(urn) && urnsToOverride.contains(urn)) 
{
+          alreadyCheckedURns.add(urn);
+          // All transforms in the pipeline with the given urns have to be 
overridden.
+          List<
+                  AppliedPTransform<
+                      PInput,
+                      POutput,
+                      org.apache.beam.sdk.transforms.PTransform<? super 
PInput, POutput>>>
+              appliedPTransforms =
+                  findAppliedPTransforms(
+                      urn, pipeline, 
KnownTransformPayloadTranslator.KNOWN_PAYLOAD_TRANSLATORS);
+          for (AppliedPTransform<
+                  PInput,
+                  POutput,
+                  org.apache.beam.sdk.transforms.PTransform<? super PInput, 
POutput>>
+              appliedPTransform : appliedPTransforms) {
+            TransformPayloadTranslator<
+                    org.apache.beam.sdk.transforms.PTransform<? super PInput, 
POutput>>
+                payloadTranslator =
+                    
KnownTransformPayloadTranslator.KNOWN_PAYLOAD_TRANSLATORS.get(
+                        appliedPTransform.getTransform().getClass());
+            try {
+              // Override the transform using the transform service.
+              res =
+                  updateTransformViaTransformService(
+                      urn, appliedPTransform, payloadTranslator, pipeline, 
res);
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          }
+        }
+      }
+    }
+
     // Validate that translation didn't produce an invalid pipeline.
     PipelineValidator.validate(res);
     return res;
   }
 
+  private static int findAvailablePort() throws IOException {
+    ServerSocket s = new ServerSocket(0);
+    try {
+      return s.getLocalPort();
+    } finally {
+      s.close();
+      try {
+        // Some systems don't free the port for future use immediately.
+        Thread.sleep(100);
+      } catch (InterruptedException exn) {
+        // ignore
+      }
+    }
+  }
+
+  // Override the given transform to the version available in a new transform 
service.
+  private static <
+          InputT extends PInput,
+          OutputT extends POutput,
+          TransformT extends org.apache.beam.sdk.transforms.PTransform<InputT, 
OutputT>>
+      RunnerApi.Pipeline updateTransformViaTransformService(
+          String urn,
+          AppliedPTransform<
+                  PInput,
+                  POutput,
+                  org.apache.beam.sdk.transforms.PTransform<? super PInput, 
POutput>>
+              appliedPTransform,
+          TransformPayloadTranslator<
+                  org.apache.beam.sdk.transforms.PTransform<? super PInput, 
POutput>>
+              originalPayloadTranslator,
+          Pipeline pipeline,
+          RunnerApi.Pipeline runnerAPIpipeline)
+          throws IOException {
+    ExternalTranslationOptions externalTranslationOptions =
+        pipeline.getOptions().as(ExternalTranslationOptions.class);
+
+    // Config row to re-construct the transform within the transform service.
+    Row configRow = 
originalPayloadTranslator.toConfigRow(appliedPTransform.getTransform());
+    ByteStringOutputStream outputStream = new ByteStringOutputStream();
+    try {
+      RowCoder.of(configRow.getSchema()).encode(configRow, outputStream);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    // Java expansion serivice able to identify and expand transforms that 
includes the construction
+    // config provided here.
+    ExternalTransforms.ExternalConfigurationPayload payload =
+        ExternalTransforms.ExternalConfigurationPayload.newBuilder()
+            .setSchema(SchemaTranslation.schemaToProto(configRow.getSchema(), 
true))
+            .setPayload(outputStream.toByteString())
+            .build();
+
+    String serviceAddress = null;
+    TransformServiceLauncher service = null;
+    try {
+      if (externalTranslationOptions.getTransformServiceAddress() != null) {
+        serviceAddress = 
externalTranslationOptions.getTransformServiceAddress();
+      } else if (externalTranslationOptions.getTransformServiceBeamVersion() 
!= null) {
+        String projectName = UUID.randomUUID().toString();
+        service = TransformServiceLauncher.forProject(projectName, 
findAvailablePort());
+        
service.setBeamVersion(externalTranslationOptions.getTransformServiceBeamVersion());
+
+        // Starting the transform service.
+        service.start();
+        // Waiting the service to be ready.
+        service.waitTillUp(15000);
+      } else {
+        throw new IllegalArgumentException(
+            "Either option TransformServiceAddress or option 
TransformServiceBeamVersion should be provided to override a transform using 
the transform service");
+      }
+
+      if (serviceAddress == null) {
+        throw new IllegalArgumentException(
+            "Cannot override the transform "
+                + urn
+                + " since a valid transform service address could not be 
determined");
+      }
+
+      // Creating an ExternalTransform and expanding it using the transform 
service.
+      // Input will be the same input provided to the transform bing 
overridden.
+      ExpandableTransform<InputT, OutputT> externalTransform =
+          (ExpandableTransform<InputT, OutputT>)
+              External.of(urn, payload.toByteArray(), serviceAddress);
+
+      PCollectionTuple input = PCollectionTuple.empty(pipeline);
+      for (TupleTag<?> tag : (Set<TupleTag<?>>) 
appliedPTransform.getInputs().keySet()) {
+        PCollection<?> pc = appliedPTransform.getInputs().get(tag);
+        if (pc == null) {
+          throw new IllegalArgumentException(
+              "Input of transform " + appliedPTransform + " with tag " + tag + 
" was null.");
+        }
+        input = input.and(tag, (PCollection) pc);
+      }
+      POutput output = externalTransform.expand((InputT) input);
+
+      // Outputs of the transform being overridden.
+      Map<TupleTag<?>, PCollection<?>> originalOutputs = 
appliedPTransform.getOutputs();
+
+      // After expansion some transforms might still refer to the output of 
the already overridden
+      // transform as their input.
+      // Such inputs have to be overridden to use the output of the new 
upgraded transform.
+      Map<String, String> inputReplacements = new HashMap<>();
+
+      // Will contain the outputs of the upgraded transform.
+      Map<TupleTag<?>, PCollection<?>> newOutputs = new HashMap<>();
+
+      if (output instanceof PCollectionTuple) {

Review Comment:
   This code deals with both the Pipeline object and the Pipeline proto, and 
it's a bit hard to follow where the source of truth is. My personal preference 
would be to simply act on the protos consuming the expansion service response 
directly. Honestly, I don't think that'd be any more complicated than what 
you're already having to do here already. 



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java:
##########
@@ -102,11 +121,370 @@ public void visitPrimitiveTransform(Node node) {
       // TODO(JIRA-5649): Don't even emit these transforms in the generated 
protos.
       res = elideDeprecatedViews(res);
     }
+
+    ExternalTranslationOptions externalTranslationOptions =
+        pipeline.getOptions().as(ExternalTranslationOptions.class);
+    List<String> urnsToOverride = 
externalTranslationOptions.getTransformsToOverride();
+    if (urnsToOverride.size() > 0) {
+      // We use PTransformPayloadTranslators and the Transform Service to 
re-generate the pipeline
+      // proto components for the updated transform and update the pipeline 
proto.
+      Map<String, PTransform> transforms = 
res.getComponents().getTransformsMap();
+      List<String> alreadyCheckedURns = new ArrayList<>();
+      for (Entry<String, PTransform> entry : transforms.entrySet()) {
+        String urn = entry.getValue().getSpec().getUrn();
+        if (!alreadyCheckedURns.contains(urn) && urnsToOverride.contains(urn)) 
{
+          alreadyCheckedURns.add(urn);

Review Comment:
   Oh, I see, for each transform you're then checking for any other transforms 
with that URN. I think it'd be cleaner to loop over URNs to translate rather 
than over transforms.entrySet() to get the set of suitable URNs. .



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java:
##########
@@ -102,11 +121,370 @@ public void visitPrimitiveTransform(Node node) {
       // TODO(JIRA-5649): Don't even emit these transforms in the generated 
protos.
       res = elideDeprecatedViews(res);
     }
+
+    ExternalTranslationOptions externalTranslationOptions =
+        pipeline.getOptions().as(ExternalTranslationOptions.class);
+    List<String> urnsToOverride = 
externalTranslationOptions.getTransformsToOverride();
+    if (urnsToOverride.size() > 0) {
+      // We use PTransformPayloadTranslators and the Transform Service to 
re-generate the pipeline
+      // proto components for the updated transform and update the pipeline 
proto.
+      Map<String, PTransform> transforms = 
res.getComponents().getTransformsMap();
+      List<String> alreadyCheckedURns = new ArrayList<>();
+      for (Entry<String, PTransform> entry : transforms.entrySet()) {
+        String urn = entry.getValue().getSpec().getUrn();
+        if (!alreadyCheckedURns.contains(urn) && urnsToOverride.contains(urn)) 
{
+          alreadyCheckedURns.add(urn);
+          // All transforms in the pipeline with the given urns have to be 
overridden.
+          List<
+                  AppliedPTransform<
+                      PInput,
+                      POutput,
+                      org.apache.beam.sdk.transforms.PTransform<? super 
PInput, POutput>>>
+              appliedPTransforms =
+                  findAppliedPTransforms(
+                      urn, pipeline, 
KnownTransformPayloadTranslator.KNOWN_PAYLOAD_TRANSLATORS);
+          for (AppliedPTransform<
+                  PInput,
+                  POutput,
+                  org.apache.beam.sdk.transforms.PTransform<? super PInput, 
POutput>>
+              appliedPTransform : appliedPTransforms) {
+            TransformPayloadTranslator<
+                    org.apache.beam.sdk.transforms.PTransform<? super PInput, 
POutput>>
+                payloadTranslator =
+                    
KnownTransformPayloadTranslator.KNOWN_PAYLOAD_TRANSLATORS.get(
+                        appliedPTransform.getTransform().getClass());
+            try {
+              // Override the transform using the transform service.
+              res =
+                  updateTransformViaTransformService(
+                      urn, appliedPTransform, payloadTranslator, pipeline, 
res);
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          }
+        }
+      }
+    }
+
     // Validate that translation didn't produce an invalid pipeline.
     PipelineValidator.validate(res);
     return res;
   }
 
+  private static int findAvailablePort() throws IOException {
+    ServerSocket s = new ServerSocket(0);
+    try {
+      return s.getLocalPort();
+    } finally {
+      s.close();
+      try {
+        // Some systems don't free the port for future use immediately.
+        Thread.sleep(100);
+      } catch (InterruptedException exn) {
+        // ignore
+      }
+    }
+  }
+
+  // Override the given transform to the version available in a new transform 
service.
+  private static <
+          InputT extends PInput,
+          OutputT extends POutput,
+          TransformT extends org.apache.beam.sdk.transforms.PTransform<InputT, 
OutputT>>
+      RunnerApi.Pipeline updateTransformViaTransformService(
+          String urn,
+          AppliedPTransform<
+                  PInput,
+                  POutput,
+                  org.apache.beam.sdk.transforms.PTransform<? super PInput, 
POutput>>
+              appliedPTransform,
+          TransformPayloadTranslator<
+                  org.apache.beam.sdk.transforms.PTransform<? super PInput, 
POutput>>
+              originalPayloadTranslator,
+          Pipeline pipeline,
+          RunnerApi.Pipeline runnerAPIpipeline)
+          throws IOException {
+    ExternalTranslationOptions externalTranslationOptions =
+        pipeline.getOptions().as(ExternalTranslationOptions.class);
+
+    // Config row to re-construct the transform within the transform service.
+    Row configRow = 
originalPayloadTranslator.toConfigRow(appliedPTransform.getTransform());
+    ByteStringOutputStream outputStream = new ByteStringOutputStream();
+    try {
+      RowCoder.of(configRow.getSchema()).encode(configRow, outputStream);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    // Java expansion serivice able to identify and expand transforms that 
includes the construction
+    // config provided here.
+    ExternalTransforms.ExternalConfigurationPayload payload =
+        ExternalTransforms.ExternalConfigurationPayload.newBuilder()
+            .setSchema(SchemaTranslation.schemaToProto(configRow.getSchema(), 
true))
+            .setPayload(outputStream.toByteString())
+            .build();
+
+    String serviceAddress = null;
+    TransformServiceLauncher service = null;
+    try {
+      if (externalTranslationOptions.getTransformServiceAddress() != null) {
+        serviceAddress = 
externalTranslationOptions.getTransformServiceAddress();
+      } else if (externalTranslationOptions.getTransformServiceBeamVersion() 
!= null) {
+        String projectName = UUID.randomUUID().toString();
+        service = TransformServiceLauncher.forProject(projectName, 
findAvailablePort());
+        
service.setBeamVersion(externalTranslationOptions.getTransformServiceBeamVersion());
+
+        // Starting the transform service.
+        service.start();
+        // Waiting the service to be ready.
+        service.waitTillUp(15000);
+      } else {
+        throw new IllegalArgumentException(
+            "Either option TransformServiceAddress or option 
TransformServiceBeamVersion should be provided to override a transform using 
the transform service");
+      }
+
+      if (serviceAddress == null) {
+        throw new IllegalArgumentException(
+            "Cannot override the transform "
+                + urn
+                + " since a valid transform service address could not be 
determined");
+      }
+
+      // Creating an ExternalTransform and expanding it using the transform 
service.
+      // Input will be the same input provided to the transform bing 
overridden.
+      ExpandableTransform<InputT, OutputT> externalTransform =
+          (ExpandableTransform<InputT, OutputT>)
+              External.of(urn, payload.toByteArray(), serviceAddress);
+
+      PCollectionTuple input = PCollectionTuple.empty(pipeline);
+      for (TupleTag<?> tag : (Set<TupleTag<?>>) 
appliedPTransform.getInputs().keySet()) {
+        PCollection<?> pc = appliedPTransform.getInputs().get(tag);
+        if (pc == null) {
+          throw new IllegalArgumentException(
+              "Input of transform " + appliedPTransform + " with tag " + tag + 
" was null.");
+        }
+        input = input.and(tag, (PCollection) pc);
+      }
+      POutput output = externalTransform.expand((InputT) input);
+
+      // Outputs of the transform being overridden.
+      Map<TupleTag<?>, PCollection<?>> originalOutputs = 
appliedPTransform.getOutputs();
+
+      // After expansion some transforms might still refer to the output of 
the already overridden
+      // transform as their input.
+      // Such inputs have to be overridden to use the output of the new 
upgraded transform.
+      Map<String, String> inputReplacements = new HashMap<>();
+
+      // Will contain the outputs of the upgraded transform.
+      Map<TupleTag<?>, PCollection<?>> newOutputs = new HashMap<>();
+
+      if (output instanceof PCollectionTuple) {
+        newOutputs.putAll(((PCollectionTuple) output).getAll());
+        for (Map.Entry<TupleTag<?>, PCollection<?>> entry : 
newOutputs.entrySet()) {
+          if (entry == null) {
+            throw new IllegalArgumentException(
+                "Found unexpected null entry when iterating the outputs of 
expanded "
+                    + "ExpandableTransform "
+                    + externalTransform);
+          }
+          if (!appliedPTransform.getOutputs().containsKey(entry.getKey())) {
+            throw new RuntimeException(
+                "Could not find the tag " + entry.getKey() + " in the original 
set of outputs");
+          }
+          PCollection<?> originalOutputPc = 
originalOutputs.get(entry.getKey());
+          if (originalOutputPc == null) {
+            throw new IllegalArgumentException(
+                "Original output of transform "
+                    + appliedPTransform
+                    + " with tag "
+                    + entry.getKey()
+                    + " was null");
+          }
+          inputReplacements.put(originalOutputPc.getName(), 
entry.getValue().getName());
+        }
+      } else if (output instanceof PCollection) {
+        newOutputs.put(new TupleTag<>("temp_main_tag"), (PCollection) output);
+        inputReplacements.put(
+            
originalOutputs.get(originalOutputs.keySet().iterator().next()).getName(),
+            ((PCollection) output).getName());
+      } else {
+        throw new RuntimeException("Unexpected output type");
+      }
+
+      // We create a new AppliedPTransform to represent the upgraded transform 
and register it in an
+      // SdkComponents object.
+      AppliedPTransform<?, ?, ?> updatedAppliedPTransform =
+          AppliedPTransform.of(
+              appliedPTransform.getFullName() + "_external",
+              appliedPTransform.getInputs(),
+              newOutputs,
+              externalTransform,
+              externalTransform.getResourceHints(),
+              appliedPTransform.getPipeline());
+      SdkComponents updatedComponents =
+          SdkComponents.create(
+              runnerAPIpipeline.getComponents(), 
runnerAPIpipeline.getRequirementsList());
+      String updatedTransformId =
+          updatedComponents.registerPTransform(updatedAppliedPTransform, 
Collections.emptyList());
+      RunnerApi.Components updatedRunnerApiComponents = 
updatedComponents.toComponents();
+
+      // Recording input updates to the transforms to refer to the upgraded 
transform instead of the
+      // old one.
+      // Also recording the newly generated id of the old (overridden) 
transform in the
+      // updatedRunnerApiComponents.
+      Map<String, Map<String, String>> transformInputUpdates = new HashMap<>();
+      List<String> oldTransformIds = new ArrayList<>();
+      updatedRunnerApiComponents
+          .getTransformsMap()
+          .forEach(
+              (transformId, transform) -> {
+                // Mapping from existing key to new value.
+                Map<String, String> updatedInputMap = new HashMap<>();
+                for (Map.Entry<String, String> entry : 
transform.getInputsMap().entrySet()) {
+                  if (inputReplacements.containsKey(entry.getValue())) {
+                    updatedInputMap.put(entry.getKey(), 
inputReplacements.get(entry.getValue()));
+                  }
+                }
+                for (Map.Entry<String, String> entry : 
transform.getOutputsMap().entrySet()) {
+                  if (inputReplacements.containsKey(entry.getValue())
+                      && urn.equals(transform.getSpec().getUrn())) {
+                    oldTransformIds.add(transformId);
+                  }
+                }
+                if (updatedInputMap.size() > 0) {
+                  transformInputUpdates.put(transformId, updatedInputMap);
+                }
+              });
+      // There should be only one recorded old (upgraded) transform.
+      if (oldTransformIds.size() != 1) {
+        throw new IOException(
+            "Expected exactly one transform to be updated by "
+                + oldTransformIds.size()
+                + " were updated.");
+      }
+      String oldTransformId = oldTransformIds.get(0);

Review Comment:
   Shouldn't we already know exactly what transform we're updating? This seems 
a round-about way of getting it, and will break if a transform doesn't have 
outputs. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to