[ 
https://issues.apache.org/jira/browse/BEAM-3971?focusedWorklogId=118356&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118356
 ]

ASF GitHub Bot logged work on BEAM-3971:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Jul/18 19:52
            Start Date: 02/Jul/18 19:52
    Worklog Time Spent: 10m 
      Work Description: lukecwik closed pull request #5833: [BEAM-3971, 
BEAM-4284] Remove fromProto for Pipeline and PTransform translation.
URL: https://github.com/apache/beam/pull/5833
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
index 23e5e6afc94..221f8ba9a4f 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
@@ -28,11 +28,8 @@
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
-import java.util.Optional;
-import javax.annotation.Nonnull;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.CombinePayload;
-import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
 import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.model.pipeline.v1.RunnerApi.SdkFunctionSpec;
 import 
org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
@@ -59,10 +56,6 @@
   /** A {@link TransformPayloadTranslator} for {@link Combine.PerKey}. */
   public static class CombinePayloadTranslator
       implements 
PTransformTranslation.TransformPayloadTranslator<Combine.PerKey<?, ?, ?>> {
-    public static TransformPayloadTranslator create() {
-      return new CombinePayloadTranslator();
-    }
-
     private CombinePayloadTranslator() {}
 
     @Override
@@ -86,18 +79,6 @@ public FunctionSpec translate(
       }
     }
 
-    @Override
-    public PTransformTranslation.RawPTransform<?, ?> rehydrate(
-        RunnerApi.PTransform protoTransform, RehydratedComponents 
rehydratedComponents)
-        throws IOException {
-      checkArgument(
-          protoTransform.getSpec() != null,
-          "%s received transform with null spec",
-          getClass().getSimpleName());
-      
checkArgument(protoTransform.getSpec().getUrn().equals(COMBINE_TRANSFORM_URN));
-      return new RawCombine<>(protoTransform, rehydratedComponents);
-    }
-
     /** Registers {@link CombinePayloadTranslator}. */
     @AutoService(TransformPayloadTranslatorRegistrar.class)
     public static class Registrar implements 
TransformPayloadTranslatorRegistrar {
@@ -106,33 +87,10 @@ public FunctionSpec translate(
           getTransformPayloadTranslators() {
         return Collections.singletonMap(Combine.PerKey.class, new 
CombinePayloadTranslator());
       }
-
-      @Override
-      public Map<String, ? extends TransformPayloadTranslator> 
getTransformRehydrators() {
-        return Collections.singletonMap(COMBINE_TRANSFORM_URN, new 
CombinePayloadTranslator());
-      }
     }
   }
 
-  /**
-   * These methods drive to-proto translation for both Java SDK transforms and 
rehydrated
-   * transforms.
-   */
-  interface CombineLike {
-    RunnerApi.SdkFunctionSpec getCombineFn();
-
-    Coder<?> getAccumulatorCoder();
-  }
-
-  /** Produces a {@link RunnerApi.CombinePayload} from a portable {@link 
CombineLike}. */
-  static RunnerApi.CombinePayload payloadForCombineLike(
-      CombineLike combine, SdkComponents components) throws IOException {
-    return RunnerApi.CombinePayload.newBuilder()
-        
.setAccumulatorCoderId(components.registerCoder(combine.getAccumulatorCoder()))
-        .setCombineFn(combine.getCombineFn())
-        .build();
-  }
-
+  /** Produces a {@link RunnerApi.CombinePayload} from a {@link Combine}. */
   static <K, InputT, OutputT> CombinePayload payloadForCombine(
       final AppliedPTransform<
               PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>,
@@ -141,96 +99,28 @@ public FunctionSpec translate(
       final SdkComponents components)
       throws IOException {
 
-    return payloadForCombineLike(
-        new CombineLike() {
-          @Override
-          public SdkFunctionSpec getCombineFn() {
-            return SdkFunctionSpec.newBuilder()
-                .setEnvironmentId(
-                    
components.registerEnvironment(Environments.JAVA_SDK_HARNESS_ENVIRONMENT))
-                .setSpec(
-                    FunctionSpec.newBuilder()
-                        .setUrn(JAVA_SERIALIZED_COMBINE_FN_URN)
-                        .setPayload(
-                            ByteString.copyFrom(
-                                SerializableUtils.serializeToByteArray(
-                                    combine.getTransform().getFn())))
-                        .build())
-                .build();
-          }
-
-          @Override
-          public Coder<?> getAccumulatorCoder() {
-            GlobalCombineFn<?, ?, ?> combineFn = 
combine.getTransform().getFn();
-            try {
-              return extractAccumulatorCoder(combineFn, (AppliedPTransform) 
combine);
-            } catch (CannotProvideCoderException e) {
-              throw new IllegalStateException(e);
-            }
-          }
-        },
-        components);
-  }
-
-  private static class RawCombine<K, InputT, AccumT, OutputT>
-      extends PTransformTranslation.RawPTransform<
-          PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>
-      implements CombineLike {
-
-    private final RunnerApi.PTransform protoTransform;
-    private final transient RehydratedComponents rehydratedComponents;
-    private final FunctionSpec spec;
-    private final CombinePayload payload;
-    private final Coder<AccumT> accumulatorCoder;
-
-    private RawCombine(
-        RunnerApi.PTransform protoTransform, RehydratedComponents 
rehydratedComponents)
-        throws IOException {
-      this.protoTransform = protoTransform;
-      this.rehydratedComponents = rehydratedComponents;
-      this.spec = protoTransform.getSpec();
-      this.payload = CombinePayload.parseFrom(spec.getPayload());
-
-      // Eagerly extract the coder to throw a good exception here
-      try {
-        this.accumulatorCoder =
-            (Coder<AccumT>) 
rehydratedComponents.getCoder(payload.getAccumulatorCoderId());
-      } catch (IOException exc) {
-        throw new IllegalArgumentException(
-            String.format(
-                "Failure extracting accumulator coder with id '%s' for %s",
-                payload.getAccumulatorCoderId(), 
Combine.class.getSimpleName()),
-            exc);
-      }
-    }
-
-    @Override
-    public String getUrn() {
-      return COMBINE_TRANSFORM_URN;
-    }
-
-    @Nonnull
-    @Override
-    public FunctionSpec getSpec() {
-      return spec;
-    }
-
-    @Override
-    public RunnerApi.FunctionSpec migrate(SdkComponents sdkComponents) throws 
IOException {
-      return RunnerApi.FunctionSpec.newBuilder()
-          .setUrn(COMBINE_TRANSFORM_URN)
-          .setPayload(payloadForCombineLike(this, 
sdkComponents).toByteString())
+    GlobalCombineFn<?, ?, ?> combineFn = combine.getTransform().getFn();
+    try {
+      return RunnerApi.CombinePayload.newBuilder()
+          .setAccumulatorCoderId(
+              components.registerCoder(
+                  extractAccumulatorCoder(combineFn, (AppliedPTransform) 
combine)))
+          .setCombineFn(
+              SdkFunctionSpec.newBuilder()
+                  .setEnvironmentId(
+                      
components.registerEnvironment(Environments.JAVA_SDK_HARNESS_ENVIRONMENT))
+                  .setSpec(
+                      FunctionSpec.newBuilder()
+                          .setUrn(JAVA_SERIALIZED_COMBINE_FN_URN)
+                          .setPayload(
+                              ByteString.copyFrom(
+                                  SerializableUtils.serializeToByteArray(
+                                      combine.getTransform().getFn())))
+                          .build())
+                  .build())
           .build();
-    }
-
-    @Override
-    public SdkFunctionSpec getCombineFn() {
-      return payload.getCombineFn();
-    }
-
-    @Override
-    public Coder<?> getAccumulatorCoder() {
-      return accumulatorCoder;
+    } catch (CannotProvideCoderException e) {
+      throw new IllegalArgumentException(e);
     }
   }
 
@@ -249,7 +139,7 @@ static CombinePayload toProto(
           .setCombineFn(toProto(combineFn, sdkComponents))
           .build();
     } catch (CannotProvideCoderException e) {
-      throw new IllegalStateException(e);
+      throw new IllegalArgumentException(e);
     }
   }
 
@@ -283,62 +173,4 @@ public static SdkFunctionSpec toProto(
                 .build())
         .build();
   }
-
-  public static Coder<?> getAccumulatorCoder(
-      CombinePayload payload, RehydratedComponents components) throws 
IOException {
-    String id = payload.getAccumulatorCoderId();
-    return components.getCoder(id);
-  }
-
-  public static Coder<?> getAccumulatorCoder(AppliedPTransform<?, ?, ?> 
transform)
-      throws IOException {
-    SdkComponents sdkComponents = SdkComponents.create();
-    String id =
-        getCombinePayload(transform, sdkComponents)
-            .map(CombinePayload::getAccumulatorCoderId)
-            .orElseThrow(() -> new IOException("Transform does not contain an 
AccumulatorCoder"));
-    Components components = sdkComponents.toComponents();
-    return CoderTranslation.fromProto(
-        components.getCodersOrThrow(id), 
RehydratedComponents.forComponents(components));
-  }
-
-  public static GlobalCombineFn<?, ?, ?> getCombineFn(CombinePayload payload) 
throws IOException {
-    checkArgument(
-        
payload.getCombineFn().getSpec().getUrn().equals(JAVA_SERIALIZED_COMBINE_FN_URN),
-        "Payload URN was \"%s\", should have been \"%s\".",
-        payload.getCombineFn().getSpec().getUrn(),
-        JAVA_SERIALIZED_COMBINE_FN_URN);
-    return (GlobalCombineFn<?, ?, ?>)
-        SerializableUtils.deserializeFromByteArray(
-            payload.getCombineFn().getSpec().getPayload().toByteArray(), 
"CombineFn");
-  }
-
-  public static Optional<GlobalCombineFn<?, ?, ?>> getCombineFn(
-      AppliedPTransform<?, ?, ?> transform) throws IOException {
-    Optional<CombinePayload> payload = getCombinePayload(transform);
-    if (payload.isPresent()) {
-      return Optional.of(getCombineFn(payload.get()));
-    } else {
-      return Optional.empty();
-    }
-  }
-
-  private static Optional<CombinePayload> 
getCombinePayload(AppliedPTransform<?, ?, ?> transform)
-      throws IOException {
-    return getCombinePayload(transform, SdkComponents.create());
-  }
-
-  private static Optional<CombinePayload> getCombinePayload(
-      AppliedPTransform<?, ?, ?> transform, SdkComponents components) throws 
IOException {
-    RunnerApi.PTransform proto =
-        PTransformTranslation.toProto(transform, Collections.emptyList(), 
components);
-
-    // Even if the proto has no spec, calling getSpec still returns a blank 
spec, which we want to
-    // avoid. It should be clear to the caller whether or not there was a spec 
in the transform.
-    if (proto.hasSpec()) {
-      return 
Optional.of(CombinePayload.parseFrom(proto.getSpec().getPayload()));
-    } else {
-      return Optional.empty();
-    }
-  }
 }
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
index eb10cf9784e..8d01b44ffd3 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
@@ -82,7 +82,7 @@
    */
   @Deprecated
   static class CreatePCollectionViewTranslator
-      extends 
TransformPayloadTranslator.WithDefaultRehydration<View.CreatePCollectionView<?, 
?>> {
+      implements TransformPayloadTranslator<View.CreatePCollectionView<?, ?>> {
     @Override
     public String getUrn(View.CreatePCollectionView<?, ?> transform) {
       return PTransformTranslation.CREATE_VIEW_TRANSFORM_URN;
@@ -116,10 +116,5 @@ public FunctionSpec translate(
       return Collections.singletonMap(
           View.CreatePCollectionView.class, new 
CreatePCollectionViewTranslator());
     }
-
-    @Override
-    public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
-      return Collections.emptyMap();
-    }
   }
 }
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/FlattenTranslator.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/FlattenTranslator.java
index 972c453b4ea..c9798e6c0f6 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/FlattenTranslator.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/FlattenTranslator.java
@@ -32,8 +32,7 @@
 /**
  * Utility methods for translating a {@link Assign} to and from {@link 
RunnerApi} representations.
  */
-public class FlattenTranslator
-    extends 
TransformPayloadTranslator.WithDefaultRehydration<Flatten.PCollections<?>> {
+public class FlattenTranslator implements 
TransformPayloadTranslator<Flatten.PCollections<?>> {
 
   public static TransformPayloadTranslator create() {
     return new FlattenTranslator();
@@ -60,10 +59,5 @@ public FunctionSpec translate(
         getTransformPayloadTranslators() {
       return Collections.singletonMap(Flatten.PCollections.class, new 
FlattenTranslator());
     }
-
-    @Override
-    public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
-      return Collections.emptyMap();
-    }
   }
 }
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/GroupByKeyTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/GroupByKeyTranslation.java
index 0803ad364a1..78e404171f0 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/GroupByKeyTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/GroupByKeyTranslation.java
@@ -34,8 +34,7 @@
  */
 public class GroupByKeyTranslation {
 
-  static class GroupByKeyTranslator
-      extends TransformPayloadTranslator.WithDefaultRehydration<GroupByKey<?, 
?>> {
+  static class GroupByKeyTranslator implements 
TransformPayloadTranslator<GroupByKey<?, ?>> {
     @Override
     public String getUrn(GroupByKey<?, ?> transform) {
       return PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN;
@@ -56,10 +55,5 @@ public FunctionSpec translate(
         getTransformPayloadTranslators() {
       return Collections.singletonMap(GroupByKey.class, new 
GroupByKeyTranslator());
     }
-
-    @Override
-    public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
-      return Collections.emptyMap();
-    }
   }
 }
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ImpulseTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ImpulseTranslation.java
index b78a5990e0c..23ef6eaeaa7 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ImpulseTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ImpulseTranslation.java
@@ -33,8 +33,7 @@
  * Utility methods for translating a {@link Impulse} to and from {@link 
RunnerApi} representations.
  */
 public class ImpulseTranslation {
-  private static class ImpulseTranslator
-      extends TransformPayloadTranslator.WithDefaultRehydration<Impulse> {
+  private static class ImpulseTranslator implements 
TransformPayloadTranslator<Impulse> {
     @Override
     public String getUrn(Impulse transform) {
       return PTransformTranslation.IMPULSE_TRANSFORM_URN;
@@ -55,10 +54,5 @@ public FunctionSpec translate(
         getTransformPayloadTranslators() {
       return Collections.singletonMap(Impulse.class, new ImpulseTranslator());
     }
-
-    @Override
-    public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
-      return Collections.emptyMap();
-    }
   }
 }
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index 53fe744491b..3da7871d6db 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -21,9 +21,7 @@
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.beam.runners.core.construction.BeamUrns.getUrn;
 
-import com.google.auto.value.AutoValue;
 import com.google.common.base.Joiner;
-import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
 import java.io.IOException;
@@ -92,12 +90,6 @@
   private static final Map<Class<? extends PTransform>, 
TransformPayloadTranslator>
       KNOWN_PAYLOAD_TRANSLATORS = loadTransformPayloadTranslators();
 
-  private static final Map<String, TransformPayloadTranslator> 
KNOWN_REHYDRATORS =
-      loadTransformRehydrators();
-
-  private static final TransformPayloadTranslator<?> DEFAULT_REHYDRATOR =
-      new RawPTransformTranslator();
-
   private static Map<Class<? extends PTransform>, TransformPayloadTranslator>
       loadTransformPayloadTranslators() {
     HashMap<Class<? extends PTransform>, TransformPayloadTranslator> 
translators = new HashMap<>();
@@ -122,28 +114,6 @@
     return ImmutableMap.copyOf(translators);
   }
 
-  private static Map<String, TransformPayloadTranslator> 
loadTransformRehydrators() {
-    HashMap<String, TransformPayloadTranslator> rehydrators = new HashMap<>();
-
-    for (TransformPayloadTranslatorRegistrar registrar :
-        ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
-
-      Map<String, ? extends TransformPayloadTranslator> newRehydrators =
-          registrar.getTransformRehydrators();
-
-      Set<String> alreadyRegistered =
-          Sets.intersection(rehydrators.keySet(), newRehydrators.keySet());
-
-      if (!alreadyRegistered.isEmpty()) {
-        throw new IllegalArgumentException(
-            String.format("URNs already registered: %s", Joiner.on(", 
").join(alreadyRegistered)));
-      }
-
-      rehydrators.putAll(newRehydrators);
-    }
-    return ImmutableMap.copyOf(rehydrators);
-  }
-
   private PTransformTranslation() {}
 
   /**
@@ -214,26 +184,6 @@ private PTransformTranslation() {}
     return transformBuilder.build();
   }
 
-  /**
-   * Translates a {@link RunnerApi.PTransform} to a {@link RawPTransform} 
specialized for the URN
-   * and spec.
-   */
-  static RawPTransform<?, ?> rehydrate(
-      RunnerApi.PTransform protoTransform, RehydratedComponents 
rehydratedComponents)
-      throws IOException {
-
-    @Nullable
-    TransformPayloadTranslator<?> rehydrator =
-        KNOWN_REHYDRATORS.get(
-            protoTransform.getSpec() == null ? null : 
protoTransform.getSpec().getUrn());
-
-    if (rehydrator == null) {
-      return DEFAULT_REHYDRATOR.rehydrate(protoTransform, 
rehydratedComponents);
-    } else {
-      return rehydrator.rehydrate(protoTransform, rehydratedComponents);
-    }
-  }
-
   /**
    * Translates a composite {@link AppliedPTransform} into a runner API proto 
with no component
    * transforms.
@@ -303,24 +253,6 @@ public static String 
urnForTransformOrNull(RunnerApi.PTransform transform) {
     FunctionSpec translate(AppliedPTransform<?, ?, T> application, 
SdkComponents components)
         throws IOException;
 
-    RawPTransform<?, ?> rehydrate(
-        RunnerApi.PTransform protoTransform, RehydratedComponents 
rehydratedComponents)
-        throws IOException;
-
-    /**
-     * A {@link TransformPayloadTranslator} for transforms that contain no 
references to components,
-     * so they do not need a specialized rehydration.
-     */
-    abstract class WithDefaultRehydration<T extends PTransform<?, ?>>
-        implements TransformPayloadTranslator<T> {
-      @Override
-      public final RawPTransform<?, ?> rehydrate(
-          RunnerApi.PTransform protoTransform, RehydratedComponents 
rehydratedComponents)
-          throws IOException {
-        return UnknownRawPTransform.forSpec(protoTransform.getSpec());
-      }
-    }
-
     /**
      * A {@link TransformPayloadTranslator} for transforms that contain no 
references to components,
      * so they do not need a specialized rehydration.
@@ -345,16 +277,6 @@ public final FunctionSpec translate(
                 "%s should never be translated",
                 transform.getTransform().getClass().getCanonicalName()));
       }
-
-      @Override
-      public final RawPTransform<?, ?> rehydrate(
-          RunnerApi.PTransform protoTransform, RehydratedComponents 
rehydratedComponents)
-          throws IOException {
-        throw new UnsupportedOperationException(
-            String.format(
-                "%s.rehydrate should never be called; there is no serialized 
form",
-                getClass().getCanonicalName()));
-      }
     }
   }
 
@@ -405,64 +327,4 @@ public OutputT expand(InputT input) {
               getClass().getSimpleName()));
     }
   }
-
-  @AutoValue
-  abstract static class UnknownRawPTransform extends RawPTransform<PInput, 
POutput> {
-
-    @Override
-    public String getUrn() {
-      return getSpec() == null ? null : getSpec().getUrn();
-    }
-
-    @Nullable
-    @Override
-    public abstract RunnerApi.FunctionSpec getSpec();
-
-    public static UnknownRawPTransform forSpec(RunnerApi.FunctionSpec spec) {
-      return new AutoValue_PTransformTranslation_UnknownRawPTransform(spec);
-    }
-
-    @Override
-    public POutput expand(PInput input) {
-      throw new IllegalStateException(
-          String.format(
-              "%s should never be asked to expand;"
-                  + " it is the result of deserializing an already-constructed 
Pipeline",
-              getClass().getSimpleName()));
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(this)
-          .add("urn", getUrn())
-          .add("payload", getSpec())
-          .toString();
-    }
-
-    public RunnerApi.FunctionSpec getSpecForComponents(SdkComponents 
components) {
-      return getSpec();
-    }
-  }
-
-  /** A translator that uses the explicit URN and payload from a {@link 
RawPTransform}. */
-  public static class RawPTransformTranslator
-      implements TransformPayloadTranslator<RawPTransform<?, ?>> {
-    @Override
-    public String getUrn(RawPTransform<?, ?> transform) {
-      return transform.getUrn();
-    }
-
-    @Override
-    public FunctionSpec translate(
-        AppliedPTransform<?, ?, RawPTransform<?, ?>> transform, SdkComponents 
components)
-        throws IOException {
-      return transform.getTransform().migrate(components);
-    }
-
-    @Override
-    public RawPTransform<?, ?> rehydrate(
-        RunnerApi.PTransform protoTransform, RehydratedComponents 
rehydratedComponents) {
-      return UnknownRawPTransform.forSpec(protoTransform.getSpec());
-    }
-  }
 }
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index c25822d5601..159005d735b 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -26,7 +26,6 @@
 
 import com.google.auto.service.AutoService;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.MoreObjects;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import com.google.protobuf.ByteString;
@@ -75,7 +74,6 @@
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 
@@ -115,13 +113,6 @@ public FunctionSpec translate(
           .build();
     }
 
-    @Override
-    public PTransformTranslation.RawPTransform<?, ?> rehydrate(
-        RunnerApi.PTransform protoTransform, RehydratedComponents 
rehydratedComponents)
-        throws IOException {
-      return new RawParDo<>(protoTransform, rehydratedComponents);
-    }
-
     /** Registers {@link ParDoPayloadTranslator}. */
     @AutoService(TransformPayloadTranslatorRegistrar.class)
     public static class Registrar implements 
TransformPayloadTranslatorRegistrar {
@@ -130,11 +121,6 @@ public FunctionSpec translate(
           getTransformPayloadTranslators() {
         return Collections.singletonMap(ParDo.MultiOutput.class, new 
ParDoPayloadTranslator());
       }
-
-      @Override
-      public Map<String, ? extends TransformPayloadTranslator> 
getTransformRehydrators() {
-        return Collections.singletonMap(PAR_DO_TRANSFORM_URN, new 
ParDoPayloadTranslator());
-      }
     }
   }
 
@@ -570,105 +556,6 @@ public static SdkFunctionSpec translateWindowMappingFn(
         .build();
   }
 
-  static class RawParDo<InputT, OutputT>
-      extends PTransformTranslation.RawPTransform<PCollection<InputT>, 
PCollection<OutputT>>
-      implements ParDoLike {
-
-    private final RunnerApi.PTransform protoTransform;
-    private final transient RehydratedComponents rehydratedComponents;
-
-    // Parsed from protoTransform and cached
-    private final FunctionSpec spec;
-    private final ParDoPayload payload;
-
-    public RawParDo(RunnerApi.PTransform protoTransform, RehydratedComponents 
rehydratedComponents)
-        throws IOException {
-      this.rehydratedComponents = rehydratedComponents;
-      this.protoTransform = protoTransform;
-      this.spec = protoTransform.getSpec();
-      this.payload = ParDoPayload.parseFrom(spec.getPayload());
-    }
-
-    @Override
-    public FunctionSpec getSpec() {
-      return spec;
-    }
-
-    @Override
-    public FunctionSpec migrate(SdkComponents components) throws IOException {
-      return FunctionSpec.newBuilder()
-          .setUrn(PAR_DO_TRANSFORM_URN)
-          .setPayload(payloadForParDoLike(this, components).toByteString())
-          .build();
-    }
-
-    @Override
-    public Map<TupleTag<?>, PValue> getAdditionalInputs() {
-      Map<TupleTag<?>, PValue> additionalInputs = new HashMap<>();
-      for (Map.Entry<String, SideInput> sideInputEntry : 
payload.getSideInputsMap().entrySet()) {
-        try {
-          additionalInputs.put(
-              new TupleTag<>(sideInputEntry.getKey()),
-              rehydratedComponents.getPCollection(
-                  protoTransform.getInputsOrThrow(sideInputEntry.getKey())));
-        } catch (IOException exc) {
-          throw new IllegalStateException(
-              String.format(
-                  "Could not find input with name %s for %s transform",
-                  sideInputEntry.getKey(), ParDo.class.getSimpleName()));
-        }
-      }
-      return additionalInputs;
-    }
-
-    @Override
-    public SdkFunctionSpec translateDoFn(SdkComponents newComponents) {
-      SdkFunctionSpec sdkFnSpec = payload.getDoFn();
-      return sdkFnSpec
-          .toBuilder()
-          .setEnvironmentId(
-              newComponents.registerEnvironment(
-                  
rehydratedComponents.getEnvironment(sdkFnSpec.getEnvironmentId())))
-          .build();
-    }
-
-    @Override
-    public List<RunnerApi.Parameter> translateParameters() {
-      return MoreObjects.firstNonNull(
-          payload.getParametersList(), 
Collections.<RunnerApi.Parameter>emptyList());
-    }
-
-    @Override
-    public Map<String, SideInput> translateSideInputs(SdkComponents 
components) {
-      // TODO: re-register the PCollections and UDF environments
-      return MoreObjects.firstNonNull(
-          payload.getSideInputsMap(), Collections.<String, 
SideInput>emptyMap());
-    }
-
-    @Override
-    public Map<String, RunnerApi.StateSpec> translateStateSpecs(SdkComponents 
components) {
-      // TODO: re-register the coders
-      return MoreObjects.firstNonNull(
-          payload.getStateSpecsMap(), Collections.<String, 
RunnerApi.StateSpec>emptyMap());
-    }
-
-    @Override
-    public Map<String, RunnerApi.TimerSpec> translateTimerSpecs(SdkComponents 
newComponents) {
-      return MoreObjects.firstNonNull(
-          payload.getTimerSpecsMap(), Collections.<String, 
RunnerApi.TimerSpec>emptyMap());
-    }
-
-    @Override
-    public boolean isSplittable() {
-      return payload.getSplittable();
-    }
-
-    @Override
-    public String translateRestrictionCoderId(SdkComponents newComponents) {
-      return payload.getRestrictionCoderId();
-    }
-  }
-
   /** These methods drive to-proto translation from Java and from rehydrated 
ParDos. */
   public interface ParDoLike {
     SdkFunctionSpec translateDoFn(SdkComponents newComponents);
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
index fa6ae015e54..a95420ccccd 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
@@ -18,34 +18,18 @@
 
 package org.apache.beam.runners.core.construction;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
-import 
org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
 import org.apache.beam.runners.core.construction.graph.PipelineValidator;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.runners.TransformHierarchy.Node;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.HasDisplayData;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
 
 /** Utilities for going to/from Runner API pipelines. */
 public class PipelineTranslation {
@@ -101,104 +85,4 @@ public void visitPrimitiveTransform(Node node) {
     PipelineValidator.validate(res);
     return res;
   }
-
-  private static DisplayData evaluateDisplayData(HasDisplayData component) {
-    return DisplayData.from(component);
-  }
-
-  public static Pipeline fromProto(final RunnerApi.Pipeline pipelineProto) 
throws IOException {
-    PipelineValidator.validate(pipelineProto);
-    TransformHierarchy transforms = new TransformHierarchy();
-    Pipeline pipeline = Pipeline.forTransformHierarchy(transforms, 
PipelineOptionsFactory.create());
-
-    // Keeping the PCollections straight is a semantic necessity, but being 
careful not to explode
-    // the number of coders and windowing strategies is also nice, and helps 
testing.
-    RehydratedComponents rehydratedComponents =
-        
RehydratedComponents.forComponents(pipelineProto.getComponents()).withPipeline(pipeline);
-
-    for (String rootId : pipelineProto.getRootTransformIdsList()) {
-      addRehydratedTransform(
-          transforms,
-          pipelineProto.getComponents().getTransformsOrThrow(rootId),
-          pipeline,
-          pipelineProto.getComponents().getTransformsMap(),
-          rehydratedComponents);
-    }
-
-    return pipeline;
-  }
-
-  private static void addRehydratedTransform(
-      TransformHierarchy transforms,
-      RunnerApi.PTransform transformProto,
-      Pipeline pipeline,
-      Map<String, RunnerApi.PTransform> transformProtos,
-      RehydratedComponents rehydratedComponents)
-      throws IOException {
-
-    Map<TupleTag<?>, PValue> rehydratedInputs = new HashMap<>();
-    for (Map.Entry<String, String> inputEntry : 
transformProto.getInputsMap().entrySet()) {
-      rehydratedInputs.put(
-          new TupleTag<>(inputEntry.getKey()),
-          rehydratedComponents.getPCollection(inputEntry.getValue()));
-    }
-
-    Map<TupleTag<?>, PValue> rehydratedOutputs = new HashMap<>();
-    for (Map.Entry<String, String> outputEntry : 
transformProto.getOutputsMap().entrySet()) {
-      rehydratedOutputs.put(
-          new TupleTag<>(outputEntry.getKey()),
-          rehydratedComponents.getPCollection(outputEntry.getValue()));
-    }
-
-    RawPTransform<?, ?> transform =
-        PTransformTranslation.rehydrate(transformProto, rehydratedComponents);
-
-    if (isPrimitive(transformProto)) {
-      transforms.addFinalizedPrimitiveNode(
-          transformProto.getUniqueName(), rehydratedInputs, transform, 
rehydratedOutputs);
-    } else {
-      transforms.pushFinalizedNode(
-          transformProto.getUniqueName(), rehydratedInputs, transform, 
rehydratedOutputs);
-
-      for (String childTransformId : transformProto.getSubtransformsList()) {
-        addRehydratedTransform(
-            transforms,
-            transformProtos.get(childTransformId),
-            pipeline,
-            transformProtos,
-            rehydratedComponents);
-      }
-
-      transforms.popNode();
-    }
-  }
-
-  private static Map<TupleTag<?>, PValue> sideInputMapToAdditionalInputs(
-      RunnerApi.PTransform transformProto,
-      RehydratedComponents rehydratedComponents,
-      Map<TupleTag<?>, PValue> rehydratedInputs,
-      Map<String, RunnerApi.SideInput> sideInputsMap)
-      throws IOException {
-    List<PCollectionView<?>> views = new ArrayList<>();
-    for (Map.Entry<String, RunnerApi.SideInput> sideInputEntry : 
sideInputsMap.entrySet()) {
-      String localName = sideInputEntry.getKey();
-      RunnerApi.SideInput sideInput = sideInputEntry.getValue();
-      PCollection<?> pCollection =
-          (PCollection<?>) checkNotNull(rehydratedInputs.get(new 
TupleTag<>(localName)));
-      views.add(
-          PCollectionViewTranslation.viewFromProto(
-              sideInput, localName, pCollection, transformProto, 
rehydratedComponents));
-    }
-    return PCollectionViews.toAdditionalInputs(views);
-  }
-
-  // A primitive transform is one with outputs that are not in its input and 
also
-  // not produced by a subtransform.
-  private static boolean isPrimitive(RunnerApi.PTransform transformProto) {
-    return transformProto.getSubtransformsCount() == 0
-        && !transformProto
-            .getInputsMap()
-            .values()
-            .containsAll(transformProto.getOutputsMap().values());
-  }
 }
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
index b5b469cf750..e12bbb67da8 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
@@ -153,8 +153,7 @@ private static SdkFunctionSpec toProto(UnboundedSource<?, 
?> source, SdkComponen
 
   /** A {@link TransformPayloadTranslator} for {@link Read.Unbounded}. */
   public static class UnboundedReadPayloadTranslator
-      extends 
PTransformTranslation.TransformPayloadTranslator.WithDefaultRehydration<
-          Read.Unbounded<?>> {
+      implements 
PTransformTranslation.TransformPayloadTranslator<Read.Unbounded<?>> {
     public static TransformPayloadTranslator create() {
       return new UnboundedReadPayloadTranslator();
     }
@@ -179,8 +178,7 @@ public FunctionSpec translate(
 
   /** A {@link TransformPayloadTranslator} for {@link Read.Bounded}. */
   public static class BoundedReadPayloadTranslator
-      extends 
PTransformTranslation.TransformPayloadTranslator.WithDefaultRehydration<
-          Read.Bounded<?>> {
+      implements 
PTransformTranslation.TransformPayloadTranslator<Read.Bounded<?>> {
     public static TransformPayloadTranslator create() {
       return new BoundedReadPayloadTranslator();
     }
@@ -214,10 +212,5 @@ public FunctionSpec translate(
           .put(Read.Bounded.class, new BoundedReadPayloadTranslator())
           .build();
     }
-
-    @Override
-    public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
-      return Collections.emptyMap();
-    }
   }
 }
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
index 60ad288a368..b581eecf414 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
@@ -23,7 +23,6 @@
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -312,17 +311,11 @@ public PCollectionTuple expand(PCollection<KV<String, 
KV<InputT, RestrictionT>>>
           .put(ProcessKeyedElements.class, new 
ProcessKeyedElementsTranslator())
           .build();
     }
-
-    @Override
-    public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
-      return Collections.emptyMap();
-    }
   }
 
   /** A translator for {@link ProcessKeyedElements}. */
   public static class ProcessKeyedElementsTranslator
-      extends 
PTransformTranslation.TransformPayloadTranslator.WithDefaultRehydration<
-          ProcessKeyedElements<?, ?, ?>> {
+      implements 
PTransformTranslation.TransformPayloadTranslator<ProcessKeyedElements<?, ?, ?>> 
{
 
     public static TransformPayloadTranslator create() {
       return new ProcessKeyedElementsTranslator();
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
index 1b1884477c9..76330bb8d9a 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
@@ -22,14 +22,12 @@
 import static 
org.apache.beam.runners.core.construction.PTransformTranslation.TEST_STREAM_TRANSFORM_URN;
 
 import com.google.auto.service.AutoService;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import javax.annotation.Nonnull;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import 
org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
 import org.apache.beam.sdk.coders.Coder;
@@ -49,73 +47,6 @@
  */
 public class TestStreamTranslation {
 
-  private interface TestStreamLike {
-    Coder<?> getValueCoder();
-
-    List<RunnerApi.TestStreamPayload.Event> getEvents();
-  }
-
-  @VisibleForTesting
-  static class RawTestStream<T> extends 
PTransformTranslation.RawPTransform<PBegin, PCollection<T>>
-      implements TestStreamLike {
-
-    private final transient RehydratedComponents rehydratedComponents;
-    private final RunnerApi.TestStreamPayload payload;
-    private final Coder<T> valueCoder;
-    private final RunnerApi.FunctionSpec spec;
-
-    public RawTestStream(
-        RunnerApi.TestStreamPayload payload, RehydratedComponents 
rehydratedComponents) {
-      this.payload = payload;
-      this.spec =
-          RunnerApi.FunctionSpec.newBuilder()
-              .setUrn(TEST_STREAM_TRANSFORM_URN)
-              .setPayload(payload.toByteString())
-              .build();
-      this.rehydratedComponents = rehydratedComponents;
-
-      // Eagerly extract the coder to throw a good exception here
-      try {
-        this.valueCoder = (Coder<T>) 
rehydratedComponents.getCoder(payload.getCoderId());
-      } catch (IOException exc) {
-        throw new IllegalArgumentException(
-            String.format(
-                "Failure extracting coder with id '%s' for %s",
-                payload.getCoderId(), TestStream.class.getSimpleName()),
-            exc);
-      }
-    }
-
-    @Override
-    public String getUrn() {
-      return TEST_STREAM_TRANSFORM_URN;
-    }
-
-    @Nonnull
-    @Override
-    public RunnerApi.FunctionSpec getSpec() {
-      return spec;
-    }
-
-    @Override
-    public RunnerApi.FunctionSpec migrate(SdkComponents components) throws 
IOException {
-      return RunnerApi.FunctionSpec.newBuilder()
-          .setUrn(TEST_STREAM_TRANSFORM_URN)
-          .setPayload(payloadForTestStreamLike(this, 
components).toByteString())
-          .build();
-    }
-
-    @Override
-    public Coder<T> getValueCoder() {
-      return valueCoder;
-    }
-
-    @Override
-    public List<RunnerApi.TestStreamPayload.Event> getEvents() {
-      return payload.getEventsList();
-    }
-  }
-
   private static TestStream<?> testStreamFromProtoPayload(
       RunnerApi.TestStreamPayload testStreamPayload, RehydratedComponents 
components)
       throws IOException {
@@ -241,20 +172,6 @@ public String getUrn(TestStream<?> transform) {
       return translateTyped(transform.getTransform(), components);
     }
 
-    @Override
-    public PTransformTranslation.RawPTransform<?, ?> rehydrate(
-        RunnerApi.PTransform protoTransform, RehydratedComponents 
rehydratedComponents)
-        throws IOException {
-      checkArgument(
-          protoTransform.getSpec() != null,
-          "%s received transform with null spec",
-          getClass().getSimpleName());
-      
checkArgument(protoTransform.getSpec().getUrn().equals(TEST_STREAM_TRANSFORM_URN));
-      return new RawTestStream<>(
-          
RunnerApi.TestStreamPayload.parseFrom(protoTransform.getSpec().getPayload()),
-          rehydratedComponents);
-    }
-
     private <T> RunnerApi.FunctionSpec translateTyped(
         final TestStream<T> testStream, SdkComponents components) throws 
IOException {
       return RunnerApi.FunctionSpec.newBuilder()
@@ -271,46 +188,24 @@ public String getUrn(TestStream<?> transform) {
           getTransformPayloadTranslators() {
         return Collections.singletonMap(TestStream.class, new 
TestStreamTranslator());
       }
+    }
+  }
 
-      @Override
-      public Map<String, ? extends TransformPayloadTranslator> 
getTransformRehydrators() {
-        return Collections.singletonMap(TEST_STREAM_TRANSFORM_URN, new 
TestStreamTranslator());
+  /** Produces a {@link RunnerApi.TestStreamPayload} from a {@link 
TestStream}. */
+  static <T> RunnerApi.TestStreamPayload payloadForTestStream(
+      final TestStream<T> transform, SdkComponents components) throws 
IOException {
+    List<RunnerApi.TestStreamPayload.Event> protoEvents = new ArrayList<>();
+    try {
+      for (TestStream.Event<T> event : transform.getEvents()) {
+        protoEvents.add(eventToProto(event, transform.getValueCoder()));
       }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
     }
-  }
 
-  /** Produces a {@link RunnerApi.TestStreamPayload} from a portable {@link 
RawTestStream}. */
-  static RunnerApi.TestStreamPayload payloadForTestStreamLike(
-      TestStreamLike transform, SdkComponents components) throws IOException {
     return RunnerApi.TestStreamPayload.newBuilder()
         .setCoderId(components.registerCoder(transform.getValueCoder()))
-        .addAllEvents(transform.getEvents())
+        .addAllEvents(protoEvents)
         .build();
   }
-
-  @VisibleForTesting
-  static <T> RunnerApi.TestStreamPayload payloadForTestStream(
-      final TestStream<T> testStream, SdkComponents components) throws 
IOException {
-    return payloadForTestStreamLike(
-        new TestStreamLike() {
-          @Override
-          public Coder<T> getValueCoder() {
-            return testStream.getValueCoder();
-          }
-
-          @Override
-          public List<RunnerApi.TestStreamPayload.Event> getEvents() {
-            try {
-              List<RunnerApi.TestStreamPayload.Event> protoEvents = new 
ArrayList<>();
-              for (TestStream.Event<T> event : testStream.getEvents()) {
-                protoEvents.add(eventToProto(event, 
testStream.getValueCoder()));
-              }
-              return protoEvents;
-            } catch (IOException e) {
-              throw new RuntimeException(e);
-            }
-          }
-        },
-        components);
-  }
 }
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java
index 58417a89421..3b3ffa18b26 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java
@@ -26,6 +26,4 @@
 public interface TransformPayloadTranslatorRegistrar {
   Map<? extends Class<? extends PTransform>, ? extends 
TransformPayloadTranslator>
       getTransformPayloadTranslators();
-
-  Map<String, ? extends TransformPayloadTranslator> getTransformRehydrators();
 }
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
index 89091b8d72c..0086f8fa89b 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
@@ -42,8 +42,7 @@
  */
 public class WindowIntoTranslation {
 
-  static class WindowAssignTranslator
-      extends 
TransformPayloadTranslator.WithDefaultRehydration<Window.Assign<?>> {
+  static class WindowAssignTranslator implements 
TransformPayloadTranslator<Window.Assign<?>> {
 
     @Override
     public String getUrn(Assign<?> transform) {
@@ -107,8 +106,7 @@ public static WindowIntoPayload 
getWindowIntoPayload(AppliedPTransform<?, ?, ?>
 
   /** A {@link TransformPayloadTranslator} for {@link Window}. */
   public static class WindowIntoPayloadTranslator
-      extends 
PTransformTranslation.TransformPayloadTranslator.WithDefaultRehydration<
-          Window.Assign<?>> {
+      implements 
PTransformTranslation.TransformPayloadTranslator<Window.Assign<?>> {
     public static TransformPayloadTranslator create() {
       return new WindowIntoPayloadTranslator();
     }
@@ -139,10 +137,5 @@ public FunctionSpec translate(
         getTransformPayloadTranslators() {
       return Collections.singletonMap(Window.Assign.class, new 
WindowIntoPayloadTranslator());
     }
-
-    @Override
-    public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
-      return Collections.emptyMap();
-    }
   }
 }
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
index 154893062ba..bd6454fa558 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
@@ -286,13 +286,6 @@ public FunctionSpec translate(
           .setPayload(payloadForWriteFiles(transform.getTransform(), 
components).toByteString())
           .build();
     }
-
-    @Override
-    public PTransformTranslation.RawPTransform<?, ?> rehydrate(
-        RunnerApi.PTransform protoTransform, RehydratedComponents 
rehydratedComponents)
-        throws IOException {
-      return new RawWriteFiles(protoTransform, rehydratedComponents);
-    }
   }
 
   /** Registers {@link WriteFilesTranslator}. */
@@ -303,11 +296,6 @@ public FunctionSpec translate(
         getTransformPayloadTranslators() {
       return Collections.singletonMap(WriteFiles.CONCRETE_CLASS, new 
WriteFilesTranslator());
     }
-
-    @Override
-    public Map<String, ? extends TransformPayloadTranslator> 
getTransformRehydrators() {
-      return Collections.singletonMap(WRITE_FILES_TRANSFORM_URN, new 
WriteFilesTranslator());
-    }
   }
 
   /** These methods drive to-proto translation from Java and from rehydrated 
WriteFiles. */
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CombineTranslationTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CombineTranslationTest.java
index e73b3ae90ac..363f2935445 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CombineTranslationTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CombineTranslationTest.java
@@ -22,6 +22,7 @@
 import static org.junit.Assert.assertEquals;
 
 import com.google.common.collect.ImmutableList;
+import java.io.IOException;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.CombinePayload;
@@ -41,6 +42,7 @@
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.junit.Rule;
@@ -76,7 +78,7 @@
     public Combine.CombineFn<Integer, ?, ?> combineFn;
 
     @Test
-    public void testToFromProto() throws Exception {
+    public void testToProto() throws Exception {
       PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3));
       input.apply(Combine.globally(combineFn));
       final AtomicReference<AppliedPTransform<?, ?, Combine.PerKey<?, ?, ?>>> 
combine =
@@ -92,7 +94,7 @@ public void leaveCompositeTransform(Node node) {
             }
           });
       checkState(combine.get() != null);
-      assertEquals(combineFn, 
CombineTranslation.getCombineFn(combine.get()).orElse(null));
+      assertEquals(combineFn, combine.get().getTransform().getFn());
 
       SdkComponents sdkComponents = SdkComponents.create();
       CombinePayload combineProto = CombineTranslation.toProto(combine.get(), 
sdkComponents);
@@ -100,9 +102,11 @@ public void leaveCompositeTransform(Node node) {
 
       assertEquals(
           combineFn.getAccumulatorCoder(pipeline.getCoderRegistry(), 
input.getCoder()),
-          CombineTranslation.getAccumulatorCoder(
-              combineProto, 
RehydratedComponents.forComponents(componentsProto)));
-      assertEquals(combineFn, CombineTranslation.getCombineFn(combineProto));
+          getAccumulatorCoder(combineProto, 
RehydratedComponents.forComponents(componentsProto)));
+      assertEquals(
+          combineFn,
+          SerializableUtils.deserializeFromByteArray(
+              
combineProto.getCombineFn().getSpec().getPayload().toByteArray(), "CombineFn"));
     }
   }
 
@@ -113,7 +117,7 @@ public void leaveCompositeTransform(Node node) {
     @Rule public ExpectedException exception = ExpectedException.none();
 
     @Test
-    public void testToFromProtoWithoutSideInputs() throws Exception {
+    public void testToProtoWithoutSideInputs() throws Exception {
       PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3));
       CombineFnWithContext<Integer, int[], Integer> combineFn = new 
TestCombineFnWithContext();
       input.apply(Combine.globally(combineFn).withoutDefaults());
@@ -130,7 +134,7 @@ public void leaveCompositeTransform(Node node) {
             }
           });
       checkState(combine.get() != null);
-      assertEquals(combineFn, 
CombineTranslation.getCombineFn(combine.get()).orElse(null));
+      assertEquals(combineFn, combine.get().getTransform().getFn());
 
       SdkComponents sdkComponents = SdkComponents.create();
       CombinePayload combineProto = CombineTranslation.toProto(combine.get(), 
sdkComponents);
@@ -138,9 +142,11 @@ public void leaveCompositeTransform(Node node) {
 
       assertEquals(
           combineFn.getAccumulatorCoder(pipeline.getCoderRegistry(), 
input.getCoder()),
-          CombineTranslation.getAccumulatorCoder(
-              combineProto, 
RehydratedComponents.forComponents(componentsProto)));
-      assertEquals(combineFn, CombineTranslation.getCombineFn(combineProto));
+          getAccumulatorCoder(combineProto, 
RehydratedComponents.forComponents(componentsProto)));
+      assertEquals(
+          combineFn,
+          SerializableUtils.deserializeFromByteArray(
+              
combineProto.getCombineFn().getSpec().getPayload().toByteArray(), "CombineFn"));
     }
 
     @Test
@@ -179,6 +185,12 @@ public void leaveCompositeTransform(Node node) {
     }
   }
 
+  private static Coder<?> getAccumulatorCoder(
+      CombinePayload payload, RehydratedComponents components) throws 
IOException {
+    String id = payload.getAccumulatorCoderId();
+    return components.getCoder(id);
+  }
+
   private static class TestCombineFn extends Combine.CombineFn<Integer, Void, 
Void> {
     @Override
     public Void createAccumulator() {
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java
index b6c785d0f16..405c60c68a9 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java
@@ -24,15 +24,20 @@
 import com.google.common.base.Equivalence;
 import com.google.common.collect.ImmutableList;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.Optional;
 import java.util.Set;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.CombinePayload;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.TransformHierarchy.Node;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
@@ -109,14 +114,6 @@ public void testProtoDirectly() {
     pipeline.traverseTopologically(new 
PipelineProtoVerificationVisitor(pipelineProto));
   }
 
-  @Test
-  public void testProtoAgainstRehydrated() throws Exception {
-    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline);
-    Pipeline rehydrated = PipelineTranslation.fromProto(pipelineProto);
-
-    rehydrated.traverseTopologically(new 
PipelineProtoVerificationVisitor(pipelineProto));
-  }
-
   private static class PipelineProtoVerificationVisitor extends 
PipelineVisitor.Defaults {
 
     private final RunnerApi.Pipeline pipelineProto;
@@ -159,8 +156,7 @@ public void leaveCompositeTransform(Node node) {
           // Combine translation introduces a coder that is not assigned to 
any PCollection
           // in the default expansion, and must be explicitly added here.
           try {
-            addCoders(
-                
CombineTranslation.getAccumulatorCoder(node.toAppliedPTransform(getPipeline())));
+            
addCoders(getAccumulatorCoder(node.toAppliedPTransform(getPipeline())));
           } catch (IOException e) {
             throw new RuntimeException(e);
           }
@@ -193,4 +189,30 @@ private void addCoders(Coder<?> coder) {
       }
     }
   }
+
+  private static Coder<?> getAccumulatorCoder(AppliedPTransform<?, ?, ?> 
transform)
+      throws IOException {
+    SdkComponents sdkComponents = SdkComponents.create();
+    String id =
+        getCombinePayload(transform, sdkComponents)
+            .map(CombinePayload::getAccumulatorCoderId)
+            .orElseThrow(() -> new IOException("Transform does not contain an 
AccumulatorCoder"));
+    Components components = sdkComponents.toComponents();
+    return CoderTranslation.fromProto(
+        components.getCodersOrThrow(id), 
RehydratedComponents.forComponents(components));
+  }
+
+  private static Optional<CombinePayload> getCombinePayload(
+      AppliedPTransform<?, ?, ?> transform, SdkComponents components) throws 
IOException {
+    RunnerApi.PTransform proto =
+        PTransformTranslation.toProto(transform, Collections.emptyList(), 
components);
+
+    // Even if the proto has no spec, calling getSpec still returns a blank 
spec, which we want to
+    // avoid. It should be clear to the caller whether or not there was a spec 
in the transform.
+    if (proto.hasSpec()) {
+      return 
Optional.of(CombinePayload.parseFrom(proto.getSpec().getPayload()));
+    } else {
+      return Optional.empty();
+    }
+  }
 }
diff --git a/runners/direct-java/build.gradle b/runners/direct-java/build.gradle
index 076a3fe8cb9..4d2374da87e 100644
--- a/runners/direct-java/build.gradle
+++ b/runners/direct-java/build.gradle
@@ -96,7 +96,7 @@ task needsRunnerTests(type: Test) {
   group = "Verification"
   description = "Runs tests that require a runner to validate that 
piplines/transforms work correctly"
 
-  def pipelineOptions = JsonOutput.toJson(["--runner=DirectRunner", 
"--runnerDeterminedSharding=false", "--protoTranslation"])
+  def pipelineOptions = JsonOutput.toJson(["--runner=DirectRunner", 
"--runnerDeterminedSharding=false"])
   systemProperty "beamTestPipelineOptions", pipelineOptions
 
   classpath = configurations.needsRunner
@@ -118,7 +118,7 @@ task validatesRunner(type: Test) {
   group = "Verification"
   description "Validates Direct runner"
 
-  def pipelineOptions = JsonOutput.toJson(["--runner=DirectRunner", 
"--runnerDeterminedSharding=false", "--protoTranslation"])
+  def pipelineOptions = JsonOutput.toJson(["--runner=DirectRunner", 
"--runnerDeterminedSharding=false"])
   systemProperty "beamTestPipelineOptions", pipelineOptions
 
   classpath = configurations.validatesRunner
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
index 2c5804e49d7..1599a6ec0e7 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
@@ -78,11 +76,4 @@ public Integer create(PipelineOptions options) {
       return Math.max(Runtime.getRuntime().availableProcessors(), 
MIN_PARALLELISM);
     }
   }
-
-  @Experimental(Kind.CORE_RUNNERS_ONLY)
-  @Default.Boolean(false)
-  @Description("Control whether toProto/fromProto translations are applied to 
original Pipeline")
-  boolean isProtoTranslation();
-
-  void setProtoTranslation(boolean b);
 }
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index dd28ab84c03..8a633ae9f31 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -24,7 +24,6 @@
 import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -36,7 +35,6 @@
 import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
 import org.apache.beam.runners.core.construction.PTransformMatchers;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
-import org.apache.beam.runners.core.construction.PipelineTranslation;
 import org.apache.beam.runners.core.construction.SplittableParDo;
 import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
 import 
org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory;
@@ -152,17 +150,7 @@ void setClockSupplier(Supplier<Clock> supplier) {
   }
 
   @Override
-  public DirectPipelineResult run(Pipeline originalPipeline) {
-    Pipeline pipeline;
-    if (options.isProtoTranslation()) {
-      try {
-        pipeline = 
PipelineTranslation.fromProto(PipelineTranslation.toProto(originalPipeline));
-      } catch (IOException exception) {
-        throw new RuntimeException("Error preparing pipeline for direct 
execution.", exception);
-      }
-    } else {
-      pipeline = originalPipeline;
-    }
+  public DirectPipelineResult run(Pipeline pipeline) {
     pipeline.replaceAll(defaultTransformOverrides());
     MetricsEnvironment.setMetricsSupported(true);
     try {
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java
index 746d2384457..9e356f48233 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java
@@ -23,16 +23,13 @@
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.collect.Iterables;
-import java.io.IOException;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
-import org.apache.beam.runners.core.construction.CombineTranslation;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import 
org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
 import 
org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
@@ -72,14 +69,8 @@ public static PTransformMatcher matcher() {
       public boolean matches(AppliedPTransform<?, ?, ?> application) {
         if (PTransformTranslation.COMBINE_TRANSFORM_URN.equals(
             
PTransformTranslation.urnForTransformOrNull(application.getTransform()))) {
-          try {
-            Optional<GlobalCombineFn<?, ?, ?>> fn = 
CombineTranslation.getCombineFn(application);
-            if (fn.isPresent()) {
-              return isApplicable(application.getInputs(), fn.get());
-            }
-          } catch (IOException e) {
-            throw new RuntimeException(e);
-          }
+          GlobalCombineFn<?, ?, ?> fn = ((Combine.PerKey) 
application.getTransform()).getFn();
+          return isApplicable(application.getInputs(), fn);
         }
         return false;
       }
@@ -143,37 +134,23 @@ private Factory() {}
                     PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>,
                     PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, 
OutputT>>>>
                 transform) {
-      try {
-        GlobalCombineFn<?, ?, ?> globalFn =
-            CombineTranslation.getCombineFn(transform)
-                .orElseThrow(
-                    () ->
-                        new IOException(
-                            String.format(
-                                "%s.matcher() should only match %s instances 
using %s, but %s was missing",
-                                MultiStepCombine.class.getSimpleName(),
-                                PerKey.class.getSimpleName(),
-                                CombineFn.class.getSimpleName(),
-                                CombineFn.class.getSimpleName())));
-        checkState(
-            globalFn instanceof CombineFn,
-            "%s.matcher() should only match %s instances using %s, got %s",
-            MultiStepCombine.class.getSimpleName(),
-            PerKey.class.getSimpleName(),
-            CombineFn.class.getSimpleName(),
-            globalFn.getClass().getName());
-        @SuppressWarnings("unchecked")
-        CombineFn<InputT, AccumT, OutputT> fn = (CombineFn<InputT, AccumT, 
OutputT>) globalFn;
-        @SuppressWarnings("unchecked")
-        PCollection<KV<K, InputT>> input =
-            (PCollection<KV<K, InputT>>) 
Iterables.getOnlyElement(transform.getInputs().values());
-        @SuppressWarnings("unchecked")
-        PCollection<KV<K, OutputT>> output =
-            (PCollection<KV<K, OutputT>>) 
Iterables.getOnlyElement(transform.getOutputs().values());
-        return PTransformReplacement.of(input, new MultiStepCombine<>(fn, 
output.getCoder()));
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
+      GlobalCombineFn<?, ?, ?> globalFn = ((Combine.PerKey) 
transform.getTransform()).getFn();
+      checkState(
+          globalFn instanceof CombineFn,
+          "%s.matcher() should only match %s instances using %s, got %s",
+          MultiStepCombine.class.getSimpleName(),
+          PerKey.class.getSimpleName(),
+          CombineFn.class.getSimpleName(),
+          globalFn.getClass().getName());
+      @SuppressWarnings("unchecked")
+      CombineFn<InputT, AccumT, OutputT> fn = (CombineFn<InputT, AccumT, 
OutputT>) globalFn;
+      @SuppressWarnings("unchecked")
+      PCollection<KV<K, InputT>> input =
+          (PCollection<KV<K, InputT>>) 
Iterables.getOnlyElement(transform.getInputs().values());
+      @SuppressWarnings("unchecked")
+      PCollection<KV<K, OutputT>> output =
+          (PCollection<KV<K, OutputT>>) 
Iterables.getOnlyElement(transform.getOutputs().values());
+      return PTransformReplacement.of(input, new MultiStepCombine<>(fn, 
output.getCoder()));
     }
   }
 
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index e61fe7277ff..6d27ab98c09 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -36,7 +36,6 @@
 import com.google.common.collect.ImmutableMap;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
@@ -130,11 +129,6 @@ public static TransformEvaluatorRegistry 
javaSdkNativeRegistry(
               
TransformPayloadTranslator.NotSerializable.forUrn(SPLITTABLE_PROCESS_URN))
           .build();
     }
-
-    @Override
-    public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
-      return Collections.emptyMap();
-    }
   }
 
   /**
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
index 0c94d46c1b7..11f36cbe839 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
@@ -30,7 +30,6 @@
 import java.util.Map;
 import java.util.Map.Entry;
 import javax.annotation.Nullable;
-import org.apache.beam.runners.core.construction.CombineTranslation;
 import 
org.apache.beam.runners.core.construction.CreatePCollectionViewTranslation;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.ParDoTranslation;
@@ -374,15 +373,8 @@ public void translateNode(
       DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
           context.getInputDataSet(context.getInput(transform));
 
-      CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT> combineFn;
-      try {
-        combineFn =
-            (CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT>)
-                CombineTranslation.getCombineFn(context.getCurrentTransform())
-                    .orElseThrow(() -> new IOException("CombineFn not found in 
node."));
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
+      CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT> combineFn =
+          ((Combine.PerKey) transform).getFn();
 
       KvCoder<K, InputT> inputCoder = (KvCoder<K, InputT>) 
context.getInput(transform).getCoder();
 
@@ -407,6 +399,14 @@ public void translateNode(
       Grouping<WindowedValue<KV<K, InputT>>> inputGrouping =
           inputDataSet.groupBy(new KvKeySelector<>(inputCoder.getKeyCoder()));
 
+      // construct a map from side input to WindowingStrategy so that
+      // the DoFn runner can map main-input windows to side input windows
+      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = 
new HashMap<>();
+      for (PCollectionView<?> sideInput :
+          (List<PCollectionView<?>>) ((Combine.PerKey) 
transform).getSideInputs()) {
+        sideInputStrategies.put(sideInput, 
sideInput.getWindowingStrategyInternal());
+      }
+
       WindowingStrategy<Object, BoundedWindow> boundedStrategy =
           (WindowingStrategy<Object, BoundedWindow>) windowingStrategy;
 
@@ -415,11 +415,11 @@ public void translateNode(
 
         FlinkPartialReduceFunction<K, InputT, AccumT, ?> partialReduceFunction 
=
             new FlinkPartialReduceFunction<>(
-                combineFn, boundedStrategy, new HashMap<>(), 
context.getPipelineOptions());
+                combineFn, boundedStrategy, sideInputStrategies, 
context.getPipelineOptions());
 
         FlinkReduceFunction<K, AccumT, OutputT, ?> reduceFunction =
             new FlinkReduceFunction<>(
-                combineFn, boundedStrategy, new HashMap<>(), 
context.getPipelineOptions());
+                combineFn, boundedStrategy, sideInputStrategies, 
context.getPipelineOptions());
 
         // Partially GroupReduce the values into the intermediate format 
AccumT (combine)
         GroupCombineOperator<WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, 
AccumT>>>
@@ -430,6 +430,8 @@ public void translateNode(
                     partialReduceFunction,
                     "GroupCombine: " + fullName);
 
+        transformSideInputs(((Combine.PerKey) transform).getSideInputs(), 
groupCombine, context);
+
         TypeInformation<WindowedValue<KV<K, OutputT>>> reduceTypeInfo =
             context.getTypeInfo(context.getOutput(transform));
 
@@ -442,6 +444,8 @@ public void translateNode(
                 new GroupReduceOperator<>(
                     intermediateGrouping, reduceTypeInfo, reduceFunction, 
fullName);
 
+        transformSideInputs(((Combine.PerKey) transform).getSideInputs(), 
outputDataSet, context);
+
         context.setOutputDataSet(context.getOutput(transform), outputDataSet);
 
       } else {
@@ -452,7 +456,7 @@ public void translateNode(
         RichGroupReduceFunction<WindowedValue<KV<K, InputT>>, 
WindowedValue<KV<K, OutputT>>>
             reduceFunction =
                 new FlinkMergingNonShuffleReduceFunction<>(
-                    combineFn, boundedStrategy, new HashMap<>(), 
context.getPipelineOptions());
+                    combineFn, boundedStrategy, sideInputStrategies, 
context.getPipelineOptions());
 
         TypeInformation<WindowedValue<KV<K, OutputT>>> reduceTypeInfo =
             context.getTypeInfo(context.getOutput(transform));
@@ -465,6 +469,8 @@ public void translateNode(
             outputDataSet =
                 new GroupReduceOperator<>(grouping, reduceTypeInfo, 
reduceFunction, fullName);
 
+        transformSideInputs(((Combine.PerKey) transform).getSideInputs(), 
outputDataSet, context);
+
         context.setOutputDataSet(context.getOutput(transform), outputDataSet);
       }
     }
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index af5b431a4e3..84876604cd0 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -19,8 +19,6 @@
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import java.io.IOException;
-import org.apache.beam.runners.core.construction.PipelineTranslation;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -75,13 +73,6 @@ public void translate(FlinkRunner flinkRunner, Pipeline 
pipeline) {
     this.flinkBatchEnv = null;
     this.flinkStreamEnv = null;
 
-    // Serialize and rehydrate pipeline to make sure we only depend serialized 
transforms.
-    try {
-      pipeline = 
PipelineTranslation.fromProto(PipelineTranslation.toProto(pipeline));
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-
     PipelineTranslationOptimizer optimizer =
         new PipelineTranslationOptimizer(TranslationMode.BATCH, options);
 
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 669843fdf55..e22a02a6e3a 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -36,7 +36,6 @@
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
 import org.apache.beam.runners.core.SystemReduceFn;
-import org.apache.beam.runners.core.construction.CombineTranslation;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.ParDoTranslation;
 import org.apache.beam.runners.core.construction.ReadTranslation;
@@ -61,6 +60,7 @@
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -104,6 +104,7 @@
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
@@ -837,7 +838,7 @@ public void translateNode(
               inputKvCoder.getKeyCoder(),
               keySelector);
 
-      // our operator excepts WindowedValue<KeyedWorkItem> while our input 
stream
+      // our operator expects WindowedValue<KeyedWorkItem> while our input 
stream
       // is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java 
doesn't like it ...
       @SuppressWarnings("unchecked")
       SingleOutputStreamOperator<WindowedValue<KV<K, Iterable<InputT>>>> 
outDataStream =
@@ -853,6 +854,23 @@ public void translateNode(
       extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
           PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>> 
{
 
+    @Override
+    boolean canTranslate(
+        PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> 
transform,
+        FlinkStreamingTranslationContext context) {
+      // if we have a merging window strategy and side inputs we cannot
+      // translate as a proper combine. We have to group and then run the 
combine
+      // over the final grouped values.
+      PCollection<KV<K, InputT>> input = context.getInput(transform);
+
+      @SuppressWarnings("unchecked")
+      WindowingStrategy<?, BoundedWindow> windowingStrategy =
+          (WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy();
+
+      return windowingStrategy.getWindowFn().isNonMerging()
+          || ((Combine.PerKey) transform).getSideInputs().isEmpty();
+    }
+
     @Override
     public void translateNode(
         PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> 
transform,
@@ -892,15 +910,7 @@ public void translateNode(
       KeyedStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>, ByteBuffer>
           keyedWorkItemStream = workItemStream.keyBy(keySelector);
 
-      GlobalCombineFn<? super InputT, ?, OutputT> combineFn;
-      try {
-        combineFn =
-            (GlobalCombineFn<? super InputT, ?, OutputT>)
-                CombineTranslation.getCombineFn(context.getCurrentTransform())
-                    .orElseThrow(() -> new IOException("CombineFn not found in 
node."));
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
+      GlobalCombineFn<? super InputT, ?, OutputT> combineFn = 
((Combine.PerKey) transform).getFn();
       SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> reduceFn =
           SystemReduceFn.combining(
               inputKvCoder.getKeyCoder(),
@@ -912,30 +922,81 @@ public void translateNode(
       TypeInformation<WindowedValue<KV<K, OutputT>>> outputTypeInfo =
           context.getTypeInfo(context.getOutput(transform));
 
-      TupleTag<KV<K, OutputT>> mainTag = new TupleTag<>("main output");
-      WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
-          new WindowDoFnOperator<>(
-              reduceFn,
-              fullName,
-              (Coder) windowedWorkItemCoder,
-              mainTag,
-              Collections.emptyList(),
-              new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag, 
outputCoder),
-              windowingStrategy,
-              new HashMap<>(), /* side-input mapping */
-              Collections.emptyList(), /* side inputs */
-              context.getPipelineOptions(),
-              inputKvCoder.getKeyCoder(),
-              keySelector);
+      List<PCollectionView<?>> sideInputs = ((Combine.PerKey) 
transform).getSideInputs();
 
-      // our operator excepts WindowedValue<KeyedWorkItem> while our input 
stream
-      // is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java 
doesn't like it ...
-      @SuppressWarnings("unchecked")
-      SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> outDataStream =
-          keyedWorkItemStream
-              .transform(fullName, outputTypeInfo, (OneInputStreamOperator) 
doFnOperator)
-              .uid(fullName);
-      context.setOutputDataStream(context.getOutput(transform), outDataStream);
+      if (sideInputs.isEmpty()) {
+        TupleTag<KV<K, OutputT>> mainTag = new TupleTag<>("main output");
+        WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
+            new WindowDoFnOperator<>(
+                reduceFn,
+                fullName,
+                (Coder) windowedWorkItemCoder,
+                mainTag,
+                Collections.emptyList(),
+                new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag, 
outputCoder),
+                windowingStrategy,
+                new HashMap<>(), /* side-input mapping */
+                Collections.emptyList(), /* side inputs */
+                context.getPipelineOptions(),
+                inputKvCoder.getKeyCoder(),
+                keySelector);
+
+        // our operator expects WindowedValue<KeyedWorkItem> while our input 
stream
+        // is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java 
doesn't like it ...
+        @SuppressWarnings("unchecked")
+        SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> 
outDataStream =
+            keyedWorkItemStream
+                .transform(fullName, outputTypeInfo, (OneInputStreamOperator) 
doFnOperator)
+                .uid(fullName);
+        context.setOutputDataStream(context.getOutput(transform), 
outDataStream);
+      } else {
+        Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> 
transformSideInputs =
+            transformSideInputs(sideInputs, context);
+
+        TupleTag<KV<K, OutputT>> mainTag = new TupleTag<>("main output");
+        WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
+            new WindowDoFnOperator<>(
+                reduceFn,
+                fullName,
+                (Coder) windowedWorkItemCoder,
+                mainTag,
+                Collections.emptyList(),
+                new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag, 
outputCoder),
+                windowingStrategy,
+                transformSideInputs.f0,
+                sideInputs,
+                context.getPipelineOptions(),
+                inputKvCoder.getKeyCoder(),
+                keySelector);
+
+        // we have to manually contruct the two-input transform because we're 
not
+        // allowed to have only one input keyed, normally.
+
+        TwoInputTransformation<
+                WindowedValue<SingletonKeyedWorkItem<K, InputT>>, 
RawUnionValue,
+                WindowedValue<KV<K, OutputT>>>
+            rawFlinkTransform =
+                new TwoInputTransformation<>(
+                    keyedWorkItemStream.getTransformation(),
+                    transformSideInputs.f1.broadcast().getTransformation(),
+                    transform.getName(),
+                    (TwoInputStreamOperator) doFnOperator,
+                    outputTypeInfo,
+                    keyedWorkItemStream.getParallelism());
+
+        rawFlinkTransform.setStateKeyType(keyedWorkItemStream.getKeyType());
+        
rawFlinkTransform.setStateKeySelectors(keyedWorkItemStream.getKeySelector(), 
null);
+
+        @SuppressWarnings({"unchecked", "rawtypes"})
+        SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> 
outDataStream =
+            new SingleOutputStreamOperator(
+                keyedWorkItemStream.getExecutionEnvironment(),
+                rawFlinkTransform) {}; // we have to cheat around the ctor 
being protected
+
+        
keyedWorkItemStream.getExecutionEnvironment().addOperator(rawFlinkTransform);
+
+        context.setOutputDataStream(context.getOutput(transform), 
outDataStream);
+      }
     }
   }
 
@@ -1143,11 +1204,6 @@ public String 
getUrn(SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?
               new SplittableParDoGbkIntoKeyedWorkItemsPayloadTranslator())
           .build();
     }
-
-    @Override
-    public Map<String, PTransformTranslation.TransformPayloadTranslator> 
getTransformRehydrators() {
-      return Collections.emptyMap();
-    }
   }
 
   /**
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
index 5351510c13c..717585c4d7a 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
@@ -37,7 +37,6 @@
 import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.ParDoTranslation;
-import org.apache.beam.runners.core.construction.RehydratedComponents;
 import org.apache.beam.runners.core.construction.SdkComponents;
 import 
org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import 
org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
@@ -149,16 +148,6 @@ public String getUrn(ParDoSingle<?, ?> transform) {
           .build();
     }
 
-    @Override
-    public PTransformTranslation.RawPTransform<?, ?> rehydrate(
-        RunnerApi.PTransform protoTransform, RehydratedComponents 
rehydratedComponents)
-        throws IOException {
-      throw new UnsupportedOperationException(
-          String.format(
-              "%s.rehydrate should never be called; the serialized form is 
that of a ParDo",
-              getClass().getCanonicalName()));
-    }
-
     private static RunnerApi.ParDoPayload payloadForParDoSingle(
         final ParDoSingle<?, ?> parDo, SdkComponents components) throws 
IOException {
       final DoFn<?, ?> doFn = parDo.getFn();
@@ -239,11 +228,5 @@ public String translateRestrictionCoderId(SdkComponents 
newComponents) {
         getTransformPayloadTranslators() {
       return Collections.singletonMap(ParDoSingle.class, new 
PayloadTranslator());
     }
-
-    @Override
-    public Map<String, ? extends 
PTransformTranslation.TransformPayloadTranslator>
-        getTransformRehydrators() {
-      return Collections.emptyMap();
-    }
   }
 }
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java
index 3cc7de0240a..4edd2359257 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java
@@ -18,11 +18,9 @@
 
 package org.apache.beam.runners.samza.translation;
 
-import java.io.IOException;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItemCoder;
 import org.apache.beam.runners.core.SystemReduceFn;
-import org.apache.beam.runners.core.construction.CombineTranslation;
 import org.apache.beam.runners.samza.runtime.DoFnOp;
 import org.apache.beam.runners.samza.runtime.GroupByKeyOp;
 import org.apache.beam.runners.samza.runtime.KvToKeyedWorkItemOp;
@@ -128,16 +126,8 @@ public void translate(
       return (SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow>)
           SystemReduceFn.buffering(kvInputCoder.getValueCoder());
     } else if (transform instanceof Combine.PerKey) {
-      final CombineFnBase.GlobalCombineFn<? super InputT, ?, OutputT> 
combineFn;
-      try {
-        combineFn =
-            (CombineFnBase.GlobalCombineFn<? super InputT, ?, OutputT>)
-                CombineTranslation.getCombineFn(appliedPTransform)
-                    .orElseThrow(() -> new IOException("CombineFn not found in 
node."));
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-
+      final CombineFnBase.GlobalCombineFn<? super InputT, ?, OutputT> 
combineFn =
+          ((Combine.PerKey) transform).getFn();
       return SystemReduceFn.combining(
           kvInputCoder.getKeyCoder(),
           AppliedCombineFn.withInputCoder(combineFn, 
pipeline.getCoderRegistry(), kvInputCoder));
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java
index 542fede0139..d444d05a0a8 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java
@@ -22,7 +22,6 @@
 
 import com.google.auto.service.AutoService;
 import com.google.common.collect.ImmutableMap;
-import java.util.Collections;
 import java.util.Map;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import 
org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
@@ -135,10 +134,5 @@ private static String getUrnForTransform(PTransform<?, ?> 
transform) {
       return ImmutableMap.of(
           SamzaPublishView.class, new 
SamzaPublishView.SamzaPublishViewPayloadTranslator());
     }
-
-    @Override
-    public Map<String, PTransformTranslation.TransformPayloadTranslator> 
getTransformRehydrators() {
-      return Collections.emptyMap();
-    }
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 118356)
    Time Spent: 7.5h  (was: 7h 20m)

> Pipeline translation utilities should not use SDK construction classes
> ----------------------------------------------------------------------
>
>                 Key: BEAM-3971
>                 URL: https://issues.apache.org/jira/browse/BEAM-3971
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-core
>            Reporter: Ben Sidhom
>            Assignee: Ben Sidhom
>            Priority: Major
>          Time Spent: 7.5h
>  Remaining Estimate: 0h
>
> In general, portable runners will require access to pipeline information not 
> available in rehydrated pipelines while constructing physical plans. 
> Translation utilities should operate directly on protos or on thin, 
> information-preserving wrappers.
> The pipeline fusion utilities already operate on protos directly and can be 
> used as an example of how this could be done.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to