This is an automated email from the ASF dual-hosted git repository.

kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a65f1c4  Revert "Merge pull request #7316 [BEAM-6269] Cross-SDK 
transform expansion protocol."
     new d9a1bac  Merge pull request #7663: [BEAM-6539] Revert "Merge pull 
request #7316 [BEAM-6269] Cross-SDK transform expansion protocol"
a65f1c4 is described below

commit a65f1c43b7fbedfec3e7396d9e024bf0e762c33d
Author: Daniel Oliveira <daniel.o.program...@gmail.com>
AuthorDate: Tue Jan 29 11:01:02 2019 -0800

    Revert "Merge pull request #7316 [BEAM-6269] Cross-SDK transform expansion 
protocol."
    
    This reverts commit 26bd104f122986429ba5a8abd583b04a44a8edee, reversing
    changes made to dcda09a8725d30467c4eb549985e3fe979208759.
---
 model/job-management/build.gradle                  |   5 +-
 .../src/main/proto/beam_expansion_api.proto        |  68 -------
 .../pipeline/src/main/proto/beam_runner_api.proto  |   3 -
 runners/core-construction-java/build.gradle        |  18 --
 .../core/construction/ExpansionService.java        | 205 --------------------
 .../core/construction/ModelCoderRegistrar.java     |   3 -
 .../runners/core/construction/ModelCoders.java     |   2 -
 .../core/construction/RehydratedComponents.java    |  11 --
 .../runners/core/construction/SdkComponents.java   |  57 +-----
 .../core/construction/CoderTranslationTest.java    |   1 -
 .../core/construction/ExpansionServiceTest.java    | 103 ----------
 .../core/construction/TestExpansionService.java    |  52 ------
 .../fnexecution/control/RemoteExecutionTest.java   |  32 +++-
 .../beam/sdk/runners/TransformHierarchy.java       |  28 +--
 sdks/python/apache_beam/coders/coders.py           |  36 ----
 sdks/python/apache_beam/pipeline.py                |  30 ++-
 sdks/python/apache_beam/pvalue.py                  |  12 +-
 .../python/apache_beam/runners/pipeline_context.py |  34 +---
 .../apache_beam/runners/pipeline_context_test.py   |   8 -
 .../runners/portability/expansion_service.py       | 118 ------------
 sdks/python/apache_beam/transforms/__init__.py     |   1 -
 sdks/python/apache_beam/transforms/external.py     | 208 ---------------------
 .../python/apache_beam/transforms/external_test.py | 205 --------------------
 sdks/python/apache_beam/transforms/ptransform.py   |  46 +----
 sdks/python/build.gradle                           |  12 --
 25 files changed, 65 insertions(+), 1233 deletions(-)

diff --git a/model/job-management/build.gradle 
b/model/job-management/build.gradle
index 4f81152..4c50782 100644
--- a/model/job-management/build.gradle
+++ b/model/job-management/build.gradle
@@ -17,10 +17,7 @@
  */
 
 apply plugin: org.apache.beam.gradle.BeamModulePlugin
-applyPortabilityNature(shadowJarValidationExcludes:[
-    "org/apache/beam/model/expansion/v1/**",
-    "org/apache/beam/model/jobmanagement/v1/**",
-])
+applyPortabilityNature(shadowJarValidationExcludes: 
["org/apache/beam/model/jobmanagement/v1/**"])
 
 description = "Apache Beam :: Model :: Job Management"
 ext.summary = "Portable definitions for submitting pipelines."
diff --git a/model/job-management/src/main/proto/beam_expansion_api.proto 
b/model/job-management/src/main/proto/beam_expansion_api.proto
deleted file mode 100644
index 92b0dd2..0000000
--- a/model/job-management/src/main/proto/beam_expansion_api.proto
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/*
- * Protocol Buffers describing the Expansion API, an api for expanding
- * transforms in a remote SDK.
- */
-
-syntax = "proto3";
-
-package org.apache.beam.model.expansion.v1;
-
-option go_package = "construction_v1";
-option java_package = "org.apache.beam.model.expansion.v1";
-option java_outer_classname = "ExpansionApi";
-
-import "beam_runner_api.proto";
-
-message ExpansionRequest {
-  // Set of components needed to interpret the transform, or which
-  // may be useful for its expansion.  This includes the input
-  // PCollections (if any) to the to-be-expanded transform, along
-  // with their coders and windowing strategies.
-  org.apache.beam.model.pipeline.v1.Components components = 1;
-
-  // The actual PTransform to be expaneded according to its spec.
-  // Its input should be set, but its subtransforms and outputs
-  // should not be.
-  org.apache.beam.model.pipeline.v1.PTransform transform = 2;
-
-  // A namespace (prefix) to use for the id of any newly created
-  // components.
-  string namespace = 3;
-}
-
-message ExpansionResponse {
-  // Set of components needed to execute the expanded transform,
-  // including the (original) inputs, outputs, and subtransforms.
-  org.apache.beam.model.pipeline.v1.Components components = 1;
-
-  // The expanded transform itself, with references to its outputs
-  // and subtransforms.
-  org.apache.beam.model.pipeline.v1.PTransform transform = 2;
-
-  // (Optional) An string representation of any error encountered while
-  // attempting to expand this transform.
-  string error = 10;
-}
-
-// Job Service for constructing pipelines
-service ExpansionService {
-  rpc Expand (ExpansionRequest) returns (ExpansionResponse);
-}
diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto 
b/model/pipeline/src/main/proto/beam_runner_api.proto
index e081f07..42a1970 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -544,9 +544,6 @@ message StandardCoders {
     // Components: None
     BYTES = 0 [(beam_urn) = "beam:coder:bytes:v1"];
 
-    // Components: None
-    STRING_UTF8 = 10 [(beam_urn) = "beam:coder:string_utf8:v1"];
-
     // Components: The key and value coder, in that order.
     KV = 1 [(beam_urn) = "beam:coder:kv:v1"];
 
diff --git a/runners/core-construction-java/build.gradle 
b/runners/core-construction-java/build.gradle
index 7ac38e0..365373c 100644
--- a/runners/core-construction-java/build.gradle
+++ b/runners/core-construction-java/build.gradle
@@ -50,21 +50,3 @@ dependencies {
   shadowTest library.java.jackson_dataformat_yaml
   shadowTest project(path: ":beam-model-fn-execution", configuration: "shadow")
 }
-
-task runExpansionService (type: JavaExec) {
-  main = "org.apache.beam.runners.core.construction.ExpansionService"
-  classpath = sourceSets.main.runtimeClasspath
-  args = [project.findProperty("constructionService.port") ?: "8097"]
-}
-
-task testExpansionService(type: Jar) {
-  dependsOn = [shadowJar, shadowTestJar]
-  manifest {
-    attributes(
-            'Main-Class': 
'org.apache.beam.runners.core.construction.TestExpansionService'
-    )
-  }
-  from { configurations.testRuntime.collect { it.isDirectory() ? it : 
zipTree(it) }}
-  from sourceSets.main.output
-  from sourceSets.test.output
-}
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExpansionService.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExpansionService.java
deleted file mode 100644
index a76bc48..0000000
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExpansionService.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core.construction;
-
-import com.google.auto.service.AutoService;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.ServiceLoader;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import org.apache.beam.model.expansion.v1.ExpansionApi;
-import org.apache.beam.model.expansion.v1.ExpansionServiceGrpc;
-import org.apache.beam.model.pipeline.v1.RunnerApi;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Server;
-import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ServerBuilder;
-import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver;
-import 
org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
-import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** A service that allows pipeline expand transforms from a remote SDK. */
-public class ExpansionService extends 
ExpansionServiceGrpc.ExpansionServiceImplBase {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(ExpansionService.class);
-
-  /**
-   * A registrar that creates {@link TransformProvider} instances from {@link
-   * RunnerApi.FunctionSpec}s.
-   *
-   * <p>Transform authors have the ability to provide a registrar by creating 
a {@link
-   * ServiceLoader} entry and a concrete implementation of this interface.
-   *
-   * <p>It is optional but recommended to use one of the many build time tools 
such as {@link
-   * AutoService} to generate the necessary META-INF files automatically.
-   */
-  public interface ExpansionServiceRegistrar {
-    Map<String, TransformProvider> knownTransforms();
-  }
-
-  /**
-   * Provides a mapping of {@link RunnerApi.FunctionSpec} to a {@link 
PTransform}, together with
-   * mappings of its inputs and outputs to maps of PCollections.
-   *
-   * @param <InputT> input {@link PValue} type of the transform
-   * @param <OutputT> output {@link PValue} type of the transform
-   */
-  public interface TransformProvider<InputT extends PValue, OutputT extends 
PValue> {
-
-    default InputT createInput(Pipeline p, Map<String, PCollection<?>> inputs) 
{
-      if (inputs.size() == 0) {
-        return (InputT) p.begin();
-      }
-      if (inputs.size() == 1) {
-        return (InputT) Iterables.getOnlyElement(inputs.values());
-      } else {
-        PCollectionTuple inputTuple = PCollectionTuple.empty(p);
-        for (Map.Entry<String, PCollection<?>> entry : inputs.entrySet()) {
-          inputTuple = inputTuple.and(new TupleTag(entry.getKey()), 
entry.getValue());
-        }
-        return (InputT) inputTuple;
-      }
-    }
-
-    PTransform<InputT, OutputT> getTransform(RunnerApi.FunctionSpec spec);
-
-    default Map<String, PCollection<?>> extractOutputs(OutputT output) {
-      if (output instanceof PDone) {
-        return Collections.emptyMap();
-      } else if (output instanceof PCollection) {
-        return ImmutableMap.of("output", (PCollection<?>) output);
-      } else if (output instanceof PCollectionTuple) {
-        return ((PCollectionTuple) output)
-            .getAll().entrySet().stream()
-                .collect(Collectors.toMap(entry -> entry.getKey().toString(), 
Map.Entry::getValue));
-      } else if (output instanceof PCollectionList<?>) {
-        PCollectionList<?> listOutput = (PCollectionList<?>) output;
-        return IntStream.range(0, listOutput.size())
-            .boxed()
-            .collect(Collectors.toMap(index -> "output_" + index, 
listOutput::get));
-      } else {
-        throw new UnsupportedOperationException("Unknown output type: " + 
output.getClass());
-      }
-    }
-
-    default Map<String, PCollection<?>> apply(
-        Pipeline p, String name, RunnerApi.FunctionSpec spec, Map<String, 
PCollection<?>> inputs) {
-      return extractOutputs(
-          Pipeline.applyTransform(name, createInput(p, inputs), 
getTransform(spec)));
-    }
-  }
-
-  private Map<String, TransformProvider> registeredTransforms = 
loadRegisteredTransforms();
-
-  private Map<String, TransformProvider> loadRegisteredTransforms() {
-    ImmutableMap.Builder<String, TransformProvider> registeredTransforms = 
ImmutableMap.builder();
-    for (ExpansionServiceRegistrar registrar :
-        ServiceLoader.load(ExpansionServiceRegistrar.class)) {
-      registeredTransforms.putAll(registrar.knownTransforms());
-    }
-    return registeredTransforms.build();
-  }
-
-  @VisibleForTesting
-  /*package*/ ExpansionApi.ExpansionResponse 
expand(ExpansionApi.ExpansionRequest request) {
-    LOG.info(
-        "Expanding '{}' with URN '{}'",
-        request.getTransform().getUniqueName(),
-        request.getTransform().getSpec().getUrn());
-    LOG.debug("Full transform: {}", request.getTransform());
-    Set<String> existingTransformIds = 
request.getComponents().getTransformsMap().keySet();
-    Pipeline pipeline = Pipeline.create();
-    RehydratedComponents rehydratedComponents =
-        
RehydratedComponents.forComponents(request.getComponents()).withPipeline(pipeline);
-
-    Map<String, PCollection<?>> inputs =
-        request.getTransform().getInputsMap().entrySet().stream()
-            .collect(
-                Collectors.toMap(
-                    Map.Entry::getKey,
-                    input -> {
-                      try {
-                        return 
rehydratedComponents.getPCollection(input.getValue());
-                      } catch (IOException exn) {
-                        throw new RuntimeException(exn);
-                      }
-                    }));
-    if 
(!registeredTransforms.containsKey(request.getTransform().getSpec().getUrn())) {
-      throw new UnsupportedOperationException(
-          "Unknown urn: " + request.getTransform().getSpec().getUrn());
-    }
-    registeredTransforms
-        .get(request.getTransform().getSpec().getUrn())
-        .apply(
-            pipeline,
-            request.getTransform().getUniqueName(),
-            request.getTransform().getSpec(),
-            inputs);
-
-    // Needed to find which transform was new...
-    SdkComponents sdkComponents =
-        
rehydratedComponents.getSdkComponents().withNewIdPrefix(request.getNamespace());
-    
sdkComponents.registerEnvironment(Environments.JAVA_SDK_HARNESS_ENVIRONMENT);
-    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, 
sdkComponents);
-    String expandedTransformId =
-        Iterables.getOnlyElement(
-            pipelineProto.getRootTransformIdsList().stream()
-                .filter(id -> !existingTransformIds.contains(id))
-                .collect(Collectors.toList()));
-    RunnerApi.Components components = pipelineProto.getComponents();
-    LOG.debug("Expanded to {}", 
components.getTransformsOrThrow(expandedTransformId));
-
-    return ExpansionApi.ExpansionResponse.newBuilder()
-        
.setComponents(components.toBuilder().removeTransforms(expandedTransformId))
-        .setTransform(components.getTransformsOrThrow(expandedTransformId))
-        .build();
-  }
-
-  @Override
-  public void expand(
-      ExpansionApi.ExpansionRequest request,
-      StreamObserver<ExpansionApi.ExpansionResponse> responseObserver) {
-    try {
-      responseObserver.onNext(expand(request));
-      responseObserver.onCompleted();
-    } catch (RuntimeException exn) {
-      responseObserver.onError(exn);
-      throw exn;
-    }
-  }
-
-  public static void main(String[] args) throws Exception {
-    int port = Integer.parseInt(args[0]);
-    System.out.println("Starting expansion service at localhost:" + port);
-    Server server = ServerBuilder.forPort(port).addService(new 
ExpansionService()).build();
-    server.start();
-    server.awaitTermination();
-  }
-}
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java
index d55f5d4..8843125 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java
@@ -27,7 +27,6 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.LengthPrefixCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import 
org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
@@ -47,7 +46,6 @@ public class ModelCoderRegistrar implements 
CoderTranslatorRegistrar {
   static final BiMap<Class<? extends Coder>, String> BEAM_MODEL_CODER_URNS =
       ImmutableBiMap.<Class<? extends Coder>, String>builder()
           .put(ByteArrayCoder.class, ModelCoders.BYTES_CODER_URN)
-          .put(StringUtf8Coder.class, ModelCoders.STRING_UTF8_CODER_URN)
           .put(KvCoder.class, ModelCoders.KV_CODER_URN)
           .put(VarLongCoder.class, ModelCoders.INT64_CODER_URN)
           .put(IntervalWindowCoder.class, 
ModelCoders.INTERVAL_WINDOW_CODER_URN)
@@ -64,7 +62,6 @@ public class ModelCoderRegistrar implements 
CoderTranslatorRegistrar {
   static final Map<Class<? extends Coder>, CoderTranslator<? extends Coder>> 
BEAM_MODEL_CODERS =
       ImmutableMap.<Class<? extends Coder>, CoderTranslator<? extends 
Coder>>builder()
           .put(ByteArrayCoder.class, 
CoderTranslators.atomic(ByteArrayCoder.class))
-          .put(StringUtf8Coder.class, 
CoderTranslators.atomic(StringUtf8Coder.class))
           .put(VarLongCoder.class, CoderTranslators.atomic(VarLongCoder.class))
           .put(IntervalWindowCoder.class, 
CoderTranslators.atomic(IntervalWindowCoder.class))
           .put(GlobalWindow.Coder.class, 
CoderTranslators.atomic(GlobalWindow.Coder.class))
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java
index b79ee35..3c6dfba 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java
@@ -33,7 +33,6 @@ public class ModelCoders {
   private ModelCoders() {}
 
   public static final String BYTES_CODER_URN = 
getUrn(StandardCoders.Enum.BYTES);
-  public static final String STRING_UTF8_CODER_URN = 
getUrn(StandardCoders.Enum.STRING_UTF8);
   // Where is this required explicitly, instead of implicit within 
WindowedValue and LengthPrefix
   // coders?
   public static final String INT64_CODER_URN = 
getUrn(StandardCoders.Enum.VARINT);
@@ -55,7 +54,6 @@ public class ModelCoders {
   private static final Set<String> MODEL_CODER_URNS =
       ImmutableSet.of(
           BYTES_CODER_URN,
-          STRING_UTF8_CODER_URN,
           INT64_CODER_URN,
           ITERABLE_CODER_URN,
           TIMER_CODER_URN,
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java
index 6265894..68a793a 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.core.construction;
 import static 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.concurrent.ExecutionException;
 import javax.annotation.Nullable;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
@@ -174,14 +173,4 @@ public class RehydratedComponents {
   public Components getComponents() {
     return components;
   }
-
-  public SdkComponents getSdkComponents() {
-    return SdkComponents.create(
-        components,
-        Collections.emptyMap(),
-        pCollections.asMap(),
-        windowingStrategies.asMap(),
-        coders.asMap(),
-        Collections.emptyMap());
-  }
 }
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
index 2a1b335..e44d724 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
@@ -23,7 +23,6 @@ import static 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Precondi
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
@@ -43,20 +42,22 @@ import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
 
 /** SDK objects that will be represented at some later point within a {@link 
Components} object. */
 public class SdkComponents {
-  private final String newIdPrefix;
   private final RunnerApi.Components.Builder componentsBuilder = 
RunnerApi.Components.newBuilder();
 
   private final BiMap<AppliedPTransform<?, ?, ?>, String> transformIds = 
HashBiMap.create();
   private final BiMap<PCollection<?>, String> pCollectionIds = 
HashBiMap.create();
   private final BiMap<WindowingStrategy<?, ?>, String> windowingStrategyIds = 
HashBiMap.create();
+
+  /** A map of Coder to IDs. Coders are stored here with identity equivalence. 
*/
   private final BiMap<Coder<?>, String> coderIds = HashBiMap.create();
+
   private final BiMap<Environment, String> environmentIds = HashBiMap.create();
 
   private final Set<String> reservedIds = new HashSet<>();
 
   /** Create a new {@link SdkComponents} with no components. */
   public static SdkComponents create() {
-    return new SdkComponents("");
+    return new SdkComponents();
   }
 
   /**
@@ -65,27 +66,11 @@ public class SdkComponents {
    * <p>WARNING: This action might cause some of duplicate items created.
    */
   public static SdkComponents create(RunnerApi.Components components) {
-    return new SdkComponents(components, "");
-  }
-
-  /*package*/ static SdkComponents create(
-      RunnerApi.Components components,
-      Map<String, AppliedPTransform<?, ?, ?>> transforms,
-      Map<String, PCollection<?>> pCollections,
-      Map<String, WindowingStrategy<?, ?>> windowingStrategies,
-      Map<String, Coder<?>> coders,
-      Map<String, Environment> environments) {
-    SdkComponents sdkComponents = SdkComponents.create(components);
-    sdkComponents.transformIds.inverse().putAll(transforms);
-    sdkComponents.pCollectionIds.inverse().putAll(pCollections);
-    sdkComponents.windowingStrategyIds.inverse().putAll(windowingStrategies);
-    sdkComponents.coderIds.inverse().putAll(coders);
-    sdkComponents.environmentIds.inverse().putAll(environments);
-    return sdkComponents;
+    return new SdkComponents(components);
   }
 
   public static SdkComponents create(PipelineOptions options) {
-    SdkComponents sdkComponents = new SdkComponents("");
+    SdkComponents sdkComponents = new SdkComponents();
     PortablePipelineOptions portablePipelineOptions = 
options.as(PortablePipelineOptions.class);
     sdkComponents.registerEnvironment(
         Environments.createOrGetDefaultEnvironment(
@@ -94,13 +79,9 @@ public class SdkComponents {
     return sdkComponents;
   }
 
-  private SdkComponents(String newIdPrefix) {
-    this.newIdPrefix = newIdPrefix;
-  }
-
-  private SdkComponents(RunnerApi.Components components, String newIdPrefix) {
-    this.newIdPrefix = newIdPrefix;
+  private SdkComponents() {}
 
+  private SdkComponents(RunnerApi.Components components) {
     if (components == null) {
       return;
     }
@@ -111,28 +92,10 @@ public class SdkComponents {
     reservedIds.addAll(components.getCodersMap().keySet());
     reservedIds.addAll(components.getEnvironmentsMap().keySet());
 
-    environmentIds.inverse().putAll(components.getEnvironmentsMap());
-
     componentsBuilder.mergeFrom(components);
   }
 
   /**
-   * Returns an SdkComponents like this one, but which will prefix all newly 
generated ids with the
-   * given string.
-   *
-   * <p>Useful for ensuring independently-constructed components have 
non-overlapping ids.
-   */
-  public SdkComponents withNewIdPrefix(String newIdPrefix) {
-    SdkComponents sdkComponents = new SdkComponents(componentsBuilder.build(), 
newIdPrefix);
-    sdkComponents.transformIds.putAll(transformIds);
-    sdkComponents.pCollectionIds.putAll(pCollectionIds);
-    sdkComponents.windowingStrategyIds.putAll(windowingStrategyIds);
-    sdkComponents.coderIds.putAll(coderIds);
-    sdkComponents.environmentIds.putAll(environmentIds);
-    return sdkComponents;
-  }
-
-  /**
    * Registers the provided {@link AppliedPTransform} into this {@link 
SdkComponents}, returning a
    * unique ID for the {@link AppliedPTransform}. Multiple registrations of 
the same {@link
    * AppliedPTransform} will return the same unique ID.
@@ -274,10 +237,10 @@ public class SdkComponents {
   }
 
   private String uniqify(String baseName, Set<String> existing) {
-    String name = newIdPrefix + baseName;
+    String name = baseName;
     int increment = 1;
     while (existing.contains(name) || reservedIds.contains(name)) {
-      name = newIdPrefix + baseName + Integer.toString(increment);
+      name = baseName + Integer.toString(increment);
       increment++;
     }
     return name;
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java
index 593f9a8..f96c977 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java
@@ -60,7 +60,6 @@ public class CoderTranslationTest {
   private static final Set<StructuredCoder<?>> KNOWN_CODERS =
       ImmutableSet.<StructuredCoder<?>>builder()
           .add(ByteArrayCoder.of())
-          .add(StringUtf8Coder.of())
           .add(KvCoder.of(VarLongCoder.of(), VarLongCoder.of()))
           .add(VarLongCoder.of())
           .add(IntervalWindowCoder.of())
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ExpansionServiceTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ExpansionServiceTest.java
deleted file mode 100644
index 93af15d..0000000
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ExpansionServiceTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core.construction;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
-
-import com.google.auto.service.AutoService;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import org.apache.beam.model.expansion.v1.ExpansionApi;
-import org.apache.beam.model.pipeline.v1.RunnerApi;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Impulse;
-import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
-import org.junit.Test;
-
-/** Tests for {@link ExpansionService}. */
-public class ExpansionServiceTest {
-
-  private static final String TEST_URN = "test:beam:transforms:count";
-
-  private static final String TEST_NAME = "TestName";
-
-  private static final String TEST_NAMESPACE = "namespace";
-
-  private ExpansionService expansionService = new ExpansionService();
-
-  /** Registers a single test transformation. */
-  @AutoService(ExpansionService.ExpansionServiceRegistrar.class)
-  public static class TestTransforms implements 
ExpansionService.ExpansionServiceRegistrar {
-    @Override
-    public Map<String, ExpansionService.TransformProvider> knownTransforms() {
-      return ImmutableMap.of(TEST_URN, spec -> Count.perElement());
-    }
-  }
-
-  @Test
-  public void testConstruct() {
-    Pipeline p = Pipeline.create();
-    p.apply(Impulse.create());
-    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
-    String inputPcollId =
-        Iterables.getOnlyElement(
-            
Iterables.getOnlyElement(pipelineProto.getComponents().getTransformsMap().values())
-                .getOutputsMap()
-                .values());
-    ExpansionApi.ExpansionRequest request =
-        ExpansionApi.ExpansionRequest.newBuilder()
-            .setComponents(pipelineProto.getComponents())
-            .setTransform(
-                RunnerApi.PTransform.newBuilder()
-                    .setUniqueName(TEST_NAME)
-                    
.setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(TEST_URN))
-                    .putInputs("input", inputPcollId))
-            .setNamespace(TEST_NAMESPACE)
-            .build();
-    ExpansionApi.ExpansionResponse response = expansionService.expand(request);
-    RunnerApi.PTransform expandedTransform = response.getTransform();
-    assertEquals(TEST_NAME, expandedTransform.getUniqueName());
-    // Verify it has the right input.
-    assertEquals(inputPcollId, 
Iterables.getOnlyElement(expandedTransform.getInputsMap().values()));
-    // Loose check that it's composite, and its children are represented.
-    assertNotEquals(expandedTransform.getSubtransformsCount(), 0);
-    for (String subtransform : expandedTransform.getSubtransformsList()) {
-      assertTrue(response.getComponents().containsTransforms(subtransform));
-    }
-    // Check that any newly generated components are properly namespaced.
-    Set<String> originalIds = allIds(request.getComponents());
-    for (String id : allIds(response.getComponents())) {
-      assertTrue(id, id.startsWith(TEST_NAMESPACE) || 
originalIds.contains(id));
-    }
-  }
-
-  public Set<String> allIds(RunnerApi.Components components) {
-    Set<String> all = new HashSet<>();
-    all.addAll(components.getTransformsMap().keySet());
-    all.addAll(components.getPcollectionsMap().keySet());
-    all.addAll(components.getCodersMap().keySet());
-    all.addAll(components.getWindowingStrategiesMap().keySet());
-    all.addAll(components.getEnvironmentsMap().keySet());
-    return all;
-  }
-}
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestExpansionService.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestExpansionService.java
deleted file mode 100644
index 983aa02..0000000
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestExpansionService.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core.construction;
-
-import com.google.auto.service.AutoService;
-import java.util.Map;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Filter;
-import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Server;
-import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ServerBuilder;
-import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
-
-/** An {@link ExpansionService} useful for tests. */
-public class TestExpansionService {
-
-  private static final String TEST_COUNT_URN = "pytest:beam:transforms:count";
-  private static final String TEST_FILTER_URN = 
"pytest:beam:transforms:filter_less_than";
-
-  /** Registers a single test transformation. */
-  @AutoService(ExpansionService.ExpansionServiceRegistrar.class)
-  public static class TestTransforms implements 
ExpansionService.ExpansionServiceRegistrar {
-    @Override
-    public Map<String, ExpansionService.TransformProvider> knownTransforms() {
-      return ImmutableMap.of(
-          TEST_COUNT_URN, spec -> Count.perElement(),
-          TEST_FILTER_URN, spec -> 
Filter.lessThanEq(spec.getPayload().toStringUtf8()));
-    }
-  }
-
-  public static void main(String[] args) throws Exception {
-    int port = Integer.parseInt(args[0]);
-    System.out.println("Starting expansion service at localhost:" + port);
-    Server server = ServerBuilder.forPort(port).addService(new 
ExpansionService()).build();
-    server.start();
-    server.awaitTermination();
-  }
-}
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index 80e5a56..09cf59d 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -442,7 +442,11 @@ public class RemoteExecutionTest implements Serializable {
           RemoteOutputReceiver.of(targetCoder.getValue(), 
outputContents::add));
     }
 
-    Iterable<String> sideInputData = Arrays.asList("A", "B", "C");
+    Iterable<byte[]> sideInputData =
+        Arrays.asList(
+            CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "A"),
+            CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "B"),
+            CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "C"));
     StateRequestHandler stateRequestHandler =
         StateRequestHandlers.forSideInputHandlerFactory(
             descriptor.getSideInputSpecs(),
@@ -472,9 +476,13 @@ public class RemoteExecutionTest implements Serializable {
     try (ActiveBundle bundle =
         processor.newBundle(outputReceivers, stateRequestHandler, 
progressHandler)) {
       Iterables.getOnlyElement(bundle.getInputReceivers().values())
-          .accept(WindowedValue.valueInGlobalWindow("X"));
+          .accept(
+              WindowedValue.valueInGlobalWindow(
+                  CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "X")));
       Iterables.getOnlyElement(bundle.getInputReceivers().values())
-          .accept(WindowedValue.valueInGlobalWindow("Y"));
+          .accept(
+              WindowedValue.valueInGlobalWindow(
+                  CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "Y")));
     }
     for (Collection<WindowedValue<?>> windowedValues : outputValues.values()) {
       assertThat(
@@ -1070,18 +1078,22 @@ public class RemoteExecutionTest implements 
Serializable {
             WindowedValue.valueInGlobalWindow(kvBytes("stream2X", ""))));
   }
 
-  private KV<String, byte[]> kvBytes(String key, long value) throws 
CoderException {
-    return KV.of(key, CoderUtils.encodeToByteArray(BigEndianLongCoder.of(), 
value));
+  private KV<byte[], byte[]> kvBytes(String key, long value) throws 
CoderException {
+    return KV.of(
+        CoderUtils.encodeToByteArray(StringUtf8Coder.of(), key),
+        CoderUtils.encodeToByteArray(BigEndianLongCoder.of(), value));
   }
 
-  private KV<String, String> kvBytes(String key, String value) throws 
CoderException {
-    return KV.of(key, value);
+  private KV<byte[], byte[]> kvBytes(String key, String value) throws 
CoderException {
+    return KV.of(
+        CoderUtils.encodeToByteArray(StringUtf8Coder.of(), key),
+        CoderUtils.encodeToByteArray(StringUtf8Coder.of(), value));
   }
 
-  private KV<String, org.apache.beam.runners.core.construction.Timer<byte[]>> 
timerBytes(
+  private KV<byte[], org.apache.beam.runners.core.construction.Timer<byte[]>> 
timerBytes(
       String key, long timestampOffset) throws CoderException {
     return KV.of(
-        key,
+        CoderUtils.encodeToByteArray(StringUtf8Coder.of(), key),
         org.apache.beam.runners.core.construction.Timer.of(
             BoundedWindow.TIMESTAMP_MIN_VALUE.plus(timestampOffset),
             CoderUtils.encodeToByteArray(VoidCoder.of(), null, 
Coder.Context.NESTED)));
@@ -1090,7 +1102,7 @@ public class RemoteExecutionTest implements Serializable {
   private Object timerStructuralValue(Object timer) {
     return WindowedValue.FullWindowedValueCoder.of(
             KvCoder.of(
-                StringUtf8Coder.of(),
+                ByteArrayCoder.of(),
                 
org.apache.beam.runners.core.construction.Timer.Coder.of(ByteArrayCoder.of())),
             GlobalWindow.Coder.INSTANCE)
         .structuralValue(timer);
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index 5468537..a31eced 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -185,11 +185,9 @@ public class TransformHierarchy {
   public void finishSpecifyingInput() {
     // Inputs must be completely specified before they are consumed by a 
transform.
     for (PValue inputValue : current.getInputs().values()) {
+      Node producerNode = getProducer(inputValue);
       PInput input = producerInput.remove(inputValue);
-      Node producerNode = maybeGetProducer(inputValue);
-      if (producerNode != null) {
-        inputValue.finishSpecifying(input, producerNode.getTransform());
-      }
+      inputValue.finishSpecifying(input, producerNode.getTransform());
     }
   }
 
@@ -237,12 +235,8 @@ public class TransformHierarchy {
     checkState(current != null, "Can't pop the root node of a 
TransformHierarchy");
   }
 
-  Node maybeGetProducer(PValue produced) {
-    return producers.get(produced);
-  }
-
   Node getProducer(PValue produced) {
-    return checkNotNull(maybeGetProducer(produced), "No producer found for 
%s", produced);
+    return checkNotNull(producers.get(produced), "No producer found for %s", 
produced);
   }
 
   public Set<PValue> visit(PipelineVisitor visitor) {
@@ -635,15 +629,13 @@ public class TransformHierarchy {
       if (!isRootNode()) {
         // Visit inputs.
         for (PValue inputValue : inputs.values()) {
-          Node valueProducer = maybeGetProducer(inputValue);
-          if (valueProducer != null) {
-            if (!visitedNodes.contains(valueProducer)) {
-              valueProducer.visit(visitor, visitedValues, visitedNodes, 
skippedComposites);
-            }
-            if (visitedValues.add(inputValue)) {
-              LOG.debug("Visiting input value {}", inputValue);
-              visitor.visitValue(inputValue, valueProducer);
-            }
+          Node valueProducer = getProducer(inputValue);
+          if (!visitedNodes.contains(valueProducer)) {
+            valueProducer.visit(visitor, visitedValues, visitedNodes, 
skippedComposites);
+          }
+          if (visitedValues.add(inputValue)) {
+            LOG.debug("Visiting input value {}", inputValue);
+            visitor.visitValue(inputValue, valueProducer);
           }
         }
       }
diff --git a/sdks/python/apache_beam/coders/coders.py 
b/sdks/python/apache_beam/coders/coders.py
index 2164161..baa3cf5 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -27,13 +27,11 @@ from builtins import object
 
 import google.protobuf.wrappers_pb2
 from future.moves import pickle
-from past.builtins import unicode
 
 from apache_beam.coders import coder_impl
 from apache_beam.portability import common_urns
 from apache_beam.portability import python_urns
 from apache_beam.portability.api import beam_runner_api_pb2
-from apache_beam.typehints import typehints
 from apache_beam.utils import proto_utils
 
 # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
@@ -164,9 +162,6 @@ class Coder(object):
       return d
     return self.__dict__
 
-  def to_type_hint(self):
-    raise NotImplementedError('BEAM-2717')
-
   @classmethod
   def from_type_hint(cls, unused_typehint, unused_registry):
     # If not overridden, just construct the coder without arguments.
@@ -326,13 +321,6 @@ class StrUtf8Coder(Coder):
   def is_deterministic(self):
     return True
 
-  def to_type_hint(self):
-    return unicode
-
-
-Coder.register_structured_urn(
-    common_urns.coders.STRING_UTF8.urn, StrUtf8Coder)
-
 
 class ToStringCoder(Coder):
   """A default string coder used if no sink coder is specified."""
@@ -388,9 +376,6 @@ class BytesCoder(FastCoder):
   def is_deterministic(self):
     return True
 
-  def to_type_hint(self):
-    return bytes
-
   def as_cloud_object(self, coders_context=None):
     return {
         '@type': 'kind:bytes',
@@ -415,9 +400,6 @@ class VarIntCoder(FastCoder):
   def is_deterministic(self):
     return True
 
-  def to_type_hint(self):
-    return int
-
   def as_cloud_object(self, coders_context=None):
     return {
         '@type': 'kind:varint',
@@ -442,9 +424,6 @@ class FloatCoder(FastCoder):
   def is_deterministic(self):
     return True
 
-  def to_type_hint(self):
-    return float
-
   def __eq__(self, other):
     return type(self) == type(other)
 
@@ -589,9 +568,6 @@ class PickleCoder(_PickleCoderBase):
   def as_deterministic_coder(self, step_label, error_message=None):
     return DeterministicFastPrimitivesCoder(self, step_label)
 
-  def to_type_hint(self):
-    return typehints.Any
-
 
 class DillCoder(_PickleCoderBase):
   """Coder using dill's pickle functionality."""
@@ -623,9 +599,6 @@ class DeterministicFastPrimitivesCoder(FastCoder):
   def value_coder(self):
     return self
 
-  def to_type_hint(self):
-    return typehints.Any
-
 
 class FastPrimitivesCoder(FastCoder):
   """Encodes simple primitives (e.g. str, int) efficiently.
@@ -648,9 +621,6 @@ class FastPrimitivesCoder(FastCoder):
     else:
       return DeterministicFastPrimitivesCoder(self, step_label)
 
-  def to_type_hint(self):
-    return typehints.Any
-
   def as_cloud_object(self, coders_context=None, is_pair_like=True):
     value = super(FastCoder, self).as_cloud_object(coders_context)
     # We currently use this coder in places where we cannot infer the coder to
@@ -776,9 +746,6 @@ class TupleCoder(FastCoder):
       return TupleCoder([c.as_deterministic_coder(step_label, error_message)
                          for c in self._coders])
 
-  def to_type_hint(self):
-    return typehints.Tuple[tuple(c.to_type_hint() for c in self._coders)]
-
   @staticmethod
   def from_type_hint(typehint, registry):
     return TupleCoder([registry.get_coder(t) for t in typehint.tuple_types])
@@ -911,9 +878,6 @@ class IterableCoder(FastCoder):
   def value_coder(self):
     return self._elem_coder
 
-  def to_type_hint(self):
-    return typehints.Iterable[self._elem_coder.to_type_hint()]
-
   @staticmethod
   def from_type_hint(typehint, registry):
     return IterableCoder(registry.get_coder(typehint.inner_type))
diff --git a/sdks/python/apache_beam/pipeline.py 
b/sdks/python/apache_beam/pipeline.py
index 8f0fa10..20ac5f0 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -72,7 +72,6 @@ from apache_beam.pvalue import PDone
 from apache_beam.runners import PipelineRunner
 from apache_beam.runners import create_runner
 from apache_beam.transforms import ptransform
-#from apache_beam.transforms import external
 from apache_beam.typehints import TypeCheckError
 from apache_beam.typehints import typehints
 from apache_beam.utils.annotations import deprecated
@@ -791,23 +790,23 @@ class AppliedPTransform(object):
 
     for pval in self.inputs:
       if pval not in visited and not isinstance(pval, pvalue.PBegin):
-        if pval.producer is not None:
-          pval.producer.visit(visitor, pipeline, visited)
-          # The value should be visited now since we visit outputs too.
-          assert pval in visited, pval
+        assert pval.producer is not None
+        pval.producer.visit(visitor, pipeline, visited)
+        # The value should be visited now since we visit outputs too.
+        assert pval in visited, pval
 
     # Visit side inputs.
     for pval in self.side_inputs:
       if isinstance(pval, pvalue.AsSideInput) and pval.pvalue not in visited:
         pval = pval.pvalue  # Unpack marker-object-wrapped pvalue.
-        if pval.producer is not None:
-          pval.producer.visit(visitor, pipeline, visited)
-          # The value should be visited now since we visit outputs too.
-          assert pval in visited
-          # TODO(silviuc): Is there a way to signal that we are visiting a side
-          # value? The issue is that the same PValue can be reachable through
-          # multiple paths and therefore it is not guaranteed that the value
-          # will be visited as a side value.
+        assert pval.producer is not None
+        pval.producer.visit(visitor, pipeline, visited)
+        # The value should be visited now since we visit outputs too.
+        assert pval in visited
+        # TODO(silviuc): Is there a way to signal that we are visiting a side
+        # value? The issue is that the same PValue can be reachable through
+        # multiple paths and therefore it is not guaranteed that the value
+        # will be visited as a side value.
 
     # Visit a composite or primitive transform.
     if self.is_composite():
@@ -848,11 +847,6 @@ class AppliedPTransform(object):
             if isinstance(output, pvalue.PCollection)}
 
   def to_runner_api(self, context):
-    # External tranforms require more splicing than just setting the spec.
-    from apache_beam.transforms import external
-    if isinstance(self.transform, external.ExternalTransform):
-      return self.transform.to_runner_api_transform(context, self.full_label)
-
     from apache_beam.portability.api import beam_runner_api_pb2
 
     def transform_to_runner_api(transform, context):
diff --git a/sdks/python/apache_beam/pvalue.py 
b/sdks/python/apache_beam/pvalue.py
index 34cafd0..6a5e42a 100644
--- a/sdks/python/apache_beam/pvalue.py
+++ b/sdks/python/apache_beam/pvalue.py
@@ -145,26 +145,20 @@ class PCollection(PValue, 
typing.Generic[typing.TypeVar('T')]):
 
   def to_runner_api(self, context):
     return beam_runner_api_pb2.PCollection(
-        unique_name=self._unique_name(),
+        unique_name='%d%s.%s' % (
+            len(self.producer.full_label), self.producer.full_label, self.tag),
         coder_id=context.coder_id_from_element_type(self.element_type),
         is_bounded=beam_runner_api_pb2.IsBounded.BOUNDED,
         windowing_strategy_id=context.windowing_strategies.get_id(
             self.windowing))
 
-  def _unique_name(self):
-    if self.producer:
-      return '%d%s.%s' % (
-          len(self.producer.full_label), self.producer.full_label, self.tag)
-    else:
-      return 'PCollection%s' % id(self)
-
   @staticmethod
   def from_runner_api(proto, context):
     # Producer and tag will be filled in later, the key point is that the
     # same object is returned for the same pcollection id.
     return PCollection(
         None,
-        element_type=context.element_type_from_coder_id(proto.coder_id),
+        element_type=pickler.loads(proto.coder_id),
         windowing=context.windowing_strategies.get_by_id(
             proto.windowing_strategy_id))
 
diff --git a/sdks/python/apache_beam/runners/pipeline_context.py 
b/sdks/python/apache_beam/runners/pipeline_context.py
index e6685d9..74156a1 100644
--- a/sdks/python/apache_beam/runners/pipeline_context.py
+++ b/sdks/python/apache_beam/runners/pipeline_context.py
@@ -55,10 +55,9 @@ class _PipelineContextMap(object):
   Under the hood it encodes and decodes these objects into runner API
   representations.
   """
-  def __init__(self, context, obj_type, namespace, proto_map=None):
+  def __init__(self, context, obj_type, proto_map=None):
     self._pipeline_context = context
     self._obj_type = obj_type
-    self._namespace = namespace
     self._obj_to_id = {}
     self._id_to_obj = {}
     self._id_to_proto = dict(proto_map) if proto_map else {}
@@ -66,11 +65,8 @@ class _PipelineContextMap(object):
 
   def _unique_ref(self, obj=None, label=None):
     self._counter += 1
-    return "%s_%s_%s_%d" % (
-        self._namespace,
-        self._obj_type.__name__,
-        label or type(obj).__name__,
-        self._counter)
+    return "ref_%s_%s_%s" % (
+        self._obj_type.__name__, label or type(obj).__name__, self._counter)
 
   def populate_map(self, proto_map):
     for id, proto in self._id_to_proto.items():
@@ -93,19 +89,6 @@ class _PipelineContextMap(object):
           self._id_to_proto[id], self._pipeline_context)
     return self._id_to_obj[id]
 
-  def get_by_proto(self, maybe_new_proto, label=None, deduplicate=False):
-    if deduplicate:
-      for id, proto in self._id_to_proto.items():
-        if proto == maybe_new_proto:
-          return id
-    return self.put_proto(self._unique_ref(label), maybe_new_proto)
-
-  def put_proto(self, id, proto):
-    if id in self._id_to_proto:
-      raise ValueError("Id '%s' is already taken." % id)
-    self._id_to_proto[id] = proto
-    return id
-
   def __getitem__(self, id):
     return self.get_by_id(id)
 
@@ -129,8 +112,7 @@ class PipelineContext(object):
 
   def __init__(
       self, proto=None, default_environment=None, use_fake_coders=False,
-      iterable_state_read=None, iterable_state_write=None,
-      namespace='ref'):
+      iterable_state_read=None, iterable_state_write=None):
     if isinstance(proto, beam_fn_api_pb2.ProcessBundleDescriptor):
       proto = beam_runner_api_pb2.Components(
           coders=dict(proto.coders.items()),
@@ -139,7 +121,7 @@ class PipelineContext(object):
     for name, cls in self._COMPONENT_TYPES.items():
       setattr(
           self, name, _PipelineContextMap(
-              self, cls, namespace, getattr(proto, name, None)))
+              self, cls, getattr(proto, name, None)))
     if default_environment:
       self._default_environment_id = self.environments.get_id(
           Environment(default_environment), label='default_environment')
@@ -159,12 +141,6 @@ class PipelineContext(object):
     else:
       return self.coders.get_id(coders.registry.get_coder(element_type))
 
-  def element_type_from_coder_id(self, coder_id):
-    if self.use_fake_coders or coder_id not in self.coders:
-      return pickler.loads(coder_id)
-    else:
-      return self.coders[coder_id].to_type_hint()
-
   @staticmethod
   def from_runner_api(proto):
     return PipelineContext(proto)
diff --git a/sdks/python/apache_beam/runners/pipeline_context_test.py 
b/sdks/python/apache_beam/runners/pipeline_context_test.py
index 6f1ec74..1e9456a 100644
--- a/sdks/python/apache_beam/runners/pipeline_context_test.py
+++ b/sdks/python/apache_beam/runners/pipeline_context_test.py
@@ -33,14 +33,6 @@ class PipelineContextTest(unittest.TestCase):
     bytes_coder_ref2 = context.coders.get_id(coders.BytesCoder())
     self.assertEqual(bytes_coder_ref, bytes_coder_ref2)
 
-  def test_deduplication_by_proto(self):
-    context = pipeline_context.PipelineContext()
-    bytes_coder_proto = coders.BytesCoder().to_runner_api(None)
-    bytes_coder_ref = context.coders.get_by_proto(bytes_coder_proto)
-    bytes_coder_ref2 = context.coders.get_by_proto(
-        bytes_coder_proto, deduplicate=True)
-    self.assertEqual(bytes_coder_ref, bytes_coder_ref2)
-
   def test_serialization(self):
     context = pipeline_context.PipelineContext()
     float_coder_ref = context.coders.get_id(coders.FloatCoder())
diff --git a/sdks/python/apache_beam/runners/portability/expansion_service.py 
b/sdks/python/apache_beam/runners/portability/expansion_service.py
deleted file mode 100644
index e407892..0000000
--- a/sdks/python/apache_beam/runners/portability/expansion_service.py
+++ /dev/null
@@ -1,118 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""A PipelineExpansion service.
-"""
-from __future__ import absolute_import
-from __future__ import print_function
-
-import argparse
-import logging
-import sys
-import time
-import traceback
-
-from apache_beam import pipeline as beam_pipeline
-from apache_beam.portability import python_urns
-from apache_beam.portability.api import beam_expansion_api_pb2
-from apache_beam.portability.api import beam_expansion_api_pb2_grpc
-from apache_beam.runners import pipeline_context
-from apache_beam.runners.portability import portable_runner
-from apache_beam.transforms import external
-from apache_beam.transforms import ptransform
-
-
-class ExpansionServiceServicer(
-    beam_expansion_api_pb2_grpc.ExpansionServiceServicer):
-
-  def __init__(self, options=None):
-    self._options = options or beam_pipeline.PipelineOptions(
-        environment_type=python_urns.EMBEDDED_PYTHON)
-
-  def Expand(self, request):
-    try:
-      pipeline = beam_pipeline.Pipeline(options=self._options)
-
-      def with_pipeline(component, pcoll_id=None):
-        component.pipeline = pipeline
-        if pcoll_id:
-          component.producer, component.tag = producers[pcoll_id]
-          # We need the lookup to resolve back to this id.
-          context.pcollections._obj_to_id[component] = pcoll_id
-        return component
-
-      context = pipeline_context.PipelineContext(
-          request.components,
-          default_environment=
-          portable_runner.PortableRunner._create_environment(
-              self._options),
-          namespace=request.namespace)
-      producers = {
-          pcoll_id: (context.transforms.get_by_id(t_id), pcoll_tag)
-          for t_id, t_proto in request.components.transforms.items()
-          for pcoll_tag, pcoll_id in t_proto.outputs.items()
-      }
-      transform = with_pipeline(
-          ptransform.PTransform.from_runner_api(
-              request.transform.spec, context))
-      inputs = transform._pvaluish_from_dict({
-          tag: with_pipeline(context.pcollections.get_by_id(pcoll_id), 
pcoll_id)
-          for tag, pcoll_id in request.transform.inputs.items()
-      })
-      if not inputs:
-        inputs = pipeline
-      with external.ExternalTransform.outer_namespace(request.namespace):
-        result = pipeline.apply(
-            transform, inputs, request.transform.unique_name)
-      expanded_transform = pipeline._root_transform().parts[-1]
-      # TODO(BEAM-1833): Use named outputs internally.
-      if isinstance(result, dict):
-        expanded_transform.outputs = result
-      pipeline_proto = pipeline.to_runner_api(context=context)
-      # TODO(BEAM-1833): Use named inputs internally.
-      expanded_transform_id = context.transforms.get_id(expanded_transform)
-      expanded_transform_proto = pipeline_proto.components.transforms.pop(
-          expanded_transform_id)
-      expanded_transform_proto.inputs.clear()
-      expanded_transform_proto.inputs.update(request.transform.inputs)
-      for transform_id in pipeline_proto.root_transform_ids:
-        del pipeline_proto.components.transforms[transform_id]
-      return beam_expansion_api_pb2.ExpansionResponse(
-          components=pipeline_proto.components,
-          transform=expanded_transform_proto)
-
-    except Exception:  # pylint: disable=broad-except
-      return beam_expansion_api_pb2.ExpansionResponse(
-          error=traceback.format_exc())
-
-
-def main(unused_argv):
-  parser = argparse.ArgumentParser()
-  parser.add_argument('-p', '--port',
-                      type=int,
-                      help='port on which to serve the job api')
-  options = parser.parse_args()
-  expansion_servicer = ExpansionServiceServicer()
-  port = expansion_servicer.start_grpc_server(options.port)
-  while True:
-    logging.info('Listening for expansion requests at %d', port)
-    time.sleep(300)
-
-
-if __name__ == '__main__':
-  logging.getLogger().setLevel(logging.INFO)
-  main(sys.argv)
diff --git a/sdks/python/apache_beam/transforms/__init__.py 
b/sdks/python/apache_beam/transforms/__init__.py
index 41cfcf6..a207009 100644
--- a/sdks/python/apache_beam/transforms/__init__.py
+++ b/sdks/python/apache_beam/transforms/__init__.py
@@ -22,7 +22,6 @@ from __future__ import absolute_import
 
 from apache_beam.transforms import combiners
 from apache_beam.transforms.core import *
-from apache_beam.transforms.external import *
 from apache_beam.transforms.ptransform import *
 from apache_beam.transforms.timeutil import TimeDomain
 from apache_beam.transforms.util import *
diff --git a/sdks/python/apache_beam/transforms/external.py 
b/sdks/python/apache_beam/transforms/external.py
deleted file mode 100644
index c9f8ba4..0000000
--- a/sdks/python/apache_beam/transforms/external.py
+++ /dev/null
@@ -1,208 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Defines Transform whose expansion is implemented elsewhere.
-"""
-from __future__ import absolute_import
-from __future__ import print_function
-
-import contextlib
-import copy
-import threading
-
-import grpc
-
-from apache_beam import pvalue
-from apache_beam.portability import common_urns
-from apache_beam.portability.api import beam_expansion_api_pb2
-from apache_beam.portability.api import beam_expansion_api_pb2_grpc
-from apache_beam.portability.api import beam_runner_api_pb2
-from apache_beam.runners import pipeline_context
-from apache_beam.transforms import ptransform
-
-
-class ExternalTransform(ptransform.PTransform):
-
-  _namespace_counter = 0
-  _namespace = threading.local()
-  _namespace.value = 'external'
-
-  _EXPANDED_TRANSFORM_UNIQUE_NAME = 'root'
-  _IMPULSE_PREFIX = 'impulse'
-
-  def __init__(self, urn, payload, endpoint):
-    # TODO: Start an endpoint given an environment?
-    self._urn = urn
-    self._payload = payload
-    self._endpoint = endpoint
-    self._namespace = self._fresh_namespace()
-
-  def default_label(self):
-    return '%s(%s)' % (self.__class__.__name__, self._urn)
-
-  @classmethod
-  @contextlib.contextmanager
-  def outer_namespace(cls, namespace):
-    prev = cls._namespace.value
-    cls._namespace.value = namespace
-    yield
-    cls._namespace.value = prev
-
-  @classmethod
-  def _fresh_namespace(cls):
-    ExternalTransform._namespace_counter += 1
-    return '%s_%d' % (cls._namespace.value, cls._namespace_counter)
-
-  def expand(self, pvalueish):
-    if isinstance(pvalueish, pvalue.PBegin):
-      self._inputs = {}
-    elif isinstance(pvalueish, (list, tuple)):
-      self._inputs = {str(ix): pvalue for ix, pvalue in enumerate(pvalueish)}
-    elif isinstance(pvalueish, dict):
-      self._inputs = pvalueish
-    else:
-      self._inputs = {'input': pvalueish}
-    pipeline = (
-        next(iter(self._inputs.values())).pipeline
-        if self._inputs
-        else pvalueish.pipeline)
-    context = pipeline_context.PipelineContext()
-    transform_proto = beam_runner_api_pb2.PTransform(
-        unique_name=self._EXPANDED_TRANSFORM_UNIQUE_NAME,
-        spec=beam_runner_api_pb2.FunctionSpec(
-            urn=self._urn, payload=self._payload))
-    for tag, pcoll in self._inputs.items():
-      transform_proto.inputs[tag] = context.pcollections.get_id(pcoll)
-      # Conversion to/from proto assumes producers.
-      # TODO: Possibly loosen this.
-      context.transforms.put_proto(
-          '%s_%s' % (self._IMPULSE_PREFIX, tag),
-          beam_runner_api_pb2.PTransform(
-              unique_name='%s_%s' % (self._IMPULSE_PREFIX, tag),
-              spec=beam_runner_api_pb2.FunctionSpec(
-                  urn=common_urns.primitives.IMPULSE.urn),
-              outputs={'out': transform_proto.inputs[tag]}))
-    components = context.to_runner_api()
-    request = beam_expansion_api_pb2.ExpansionRequest(
-        components=components,
-        namespace=self._namespace,
-        transform=transform_proto)
-
-    if isinstance(self._endpoint, str):
-      with grpc.insecure_channel(self._endpoint) as channel:
-        response = beam_expansion_api_pb2_grpc.ExpansionServiceStub(
-            channel).Expand(request)
-    else:
-      response = self._endpoint.Expand(request)
-
-    if response.error:
-      raise RuntimeError(response.error)
-    self._expanded_components = response.components
-    self._expanded_transform = response.transform
-    result_context = pipeline_context.PipelineContext(response.components)
-
-    def fix_output(pcoll, tag):
-      pcoll.pipeline = pipeline
-      pcoll.tag = tag
-      return pcoll
-    self._outputs = {
-        tag: fix_output(result_context.pcollections.get_by_id(pcoll_id), tag)
-        for tag, pcoll_id in self._expanded_transform.outputs.items()
-    }
-
-    return self._output_to_pvalueish(self._outputs)
-
-  def _output_to_pvalueish(self, output_dict):
-    if len(output_dict) == 1:
-      return next(iter(output_dict.values()))
-    else:
-      return output_dict
-
-  def to_runner_api_transform(self, context, full_label):
-    pcoll_renames = {}
-    renamed_tag_seen = False
-    for tag, pcoll in self._inputs.items():
-      if tag not in self._expanded_transform.inputs:
-        if renamed_tag_seen:
-          raise RuntimeError(
-              'Ambiguity due to non-preserved tags: %s vs %s' % (
-                  sorted(self._expanded_transform.inputs.keys()),
-                  sorted(self._inputs.keys())))
-        else:
-          renamed_tag_seen = True
-          tag, = self._expanded_transform.inputs.keys()
-      pcoll_renames[self._expanded_transform.inputs[tag]] = (
-          context.pcollections.get_id(pcoll))
-    for tag, pcoll in self._outputs.items():
-      pcoll_renames[self._expanded_transform.outputs[tag]] = (
-          context.pcollections.get_id(pcoll))
-
-    def _equivalent(coder1, coder2):
-      return coder1 == coder2 or _normalize(coder1) == _normalize(coder2)
-
-    def _normalize(coder_proto):
-      normalized = copy.copy(coder_proto)
-      normalized.spec.environment_id = ''
-      # TODO(robertwb): Normalize components as well.
-      return normalized
-
-    for id, proto in self._expanded_components.coders.items():
-      if id.startswith(self._namespace):
-        context.coders.put_proto(id, proto)
-      elif id in context.coders:
-        if not _equivalent(context.coders._id_to_proto[id], proto):
-          raise RuntimeError('Re-used coder id: %s\n%s\n%s' % (
-              id, context.coders._id_to_proto[id], proto))
-      else:
-        context.coders.put_proto(id, proto)
-    for id, proto in self._expanded_components.windowing_strategies.items():
-      if id.startswith(self._namespace):
-        context.windowing_strategies.put_proto(id, proto)
-    for id, proto in self._expanded_components.environments.items():
-      if id.startswith(self._namespace):
-        context.environments.put_proto(id, proto)
-    for id, proto in self._expanded_components.pcollections.items():
-      if id not in pcoll_renames:
-        context.pcollections.put_proto(id, proto)
-
-    for id, proto in self._expanded_components.transforms.items():
-      if id.startswith(self._IMPULSE_PREFIX):
-        # Our fake inputs.
-        continue
-      assert id.startswith(self._namespace), (id, self._namespace)
-      new_proto = beam_runner_api_pb2.PTransform(
-          unique_name=full_label + proto.unique_name[
-              len(self._EXPANDED_TRANSFORM_UNIQUE_NAME):],
-          spec=proto.spec,
-          subtransforms=proto.subtransforms,
-          inputs={tag: pcoll_renames.get(pcoll, pcoll)
-                  for tag, pcoll in proto.inputs.items()},
-          outputs={tag: pcoll_renames.get(pcoll, pcoll)
-                   for tag, pcoll in proto.outputs.items()})
-      context.transforms.put_proto(id, new_proto)
-
-    return self._expanded_transform
-
-
-def memoize(func):
-  cache = {}
-
-  def wrapper(*args):
-    if args not in cache:
-      cache[args] = func(*args)
-    return cache[args]
-  return wrapper
diff --git a/sdks/python/apache_beam/transforms/external_test.py 
b/sdks/python/apache_beam/transforms/external_test.py
deleted file mode 100644
index e66cf21..0000000
--- a/sdks/python/apache_beam/transforms/external_test.py
+++ /dev/null
@@ -1,205 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Unit tests for the transform.util classes."""
-
-from __future__ import absolute_import
-
-import argparse
-import subprocess
-import sys
-import unittest
-
-import grpc
-from past.builtins import unicode
-
-import apache_beam as beam
-from apache_beam.portability import python_urns
-from apache_beam.runners.portability import expansion_service
-from apache_beam.testing.util import assert_that
-from apache_beam.testing.util import equal_to
-from apache_beam.transforms import ptransform
-
-
-class ExternalTransformTest(unittest.TestCase):
-
-  # This will be overwritten if set via a flag.
-  expansion_service_jar = None
-
-  def test_simple(self):
-
-    @ptransform.PTransform.register_urn('simple', None)
-    class SimpleTransform(ptransform.PTransform):
-      def expand(self, pcoll):
-        return pcoll | beam.Map(lambda x: 'Simple(%s)' % x)
-
-      def to_runner_api_parameter(self, unused_context):
-        return 'simple', None
-
-      @staticmethod
-      def from_runner_api_parameter(unused_parameter, unused_context):
-        return SimpleTransform()
-
-    with beam.Pipeline() as p:
-      res = (
-          p
-          | beam.Create(['a', 'b'])
-          | beam.ExternalTransform(
-              'simple',
-              None,
-              expansion_service.ExpansionServiceServicer()))
-      assert_that(res, equal_to(['Simple(a)', 'Simple(b)']))
-
-  def test_multi(self):
-
-    @ptransform.PTransform.register_urn('multi', None)
-    class MutltiTransform(ptransform.PTransform):
-      def expand(self, pcolls):
-        return {
-            'main':
-                (pcolls['main1'], pcolls['main2'])
-                | beam.Flatten()
-                | beam.Map(lambda x, s: x + s,
-                           beam.pvalue.AsSingleton(pcolls['side'])),
-            'side': pcolls['side'] | beam.Map(lambda x: x + x),
-        }
-
-      def to_runner_api_parameter(self, unused_context):
-        return 'multi', None
-
-      @staticmethod
-      def from_runner_api_parameter(unused_parameter, unused_context):
-        return MutltiTransform()
-
-    with beam.Pipeline() as p:
-      main1 = p | 'Main1' >> beam.Create(['a', 'bb'], reshuffle=False)
-      main2 = p | 'Main2' >> beam.Create(['x', 'yy', 'zzz'], reshuffle=False)
-      side = p | 'Side' >> beam.Create(['s'])
-      res = dict(main1=main1, main2=main2, side=side) | beam.ExternalTransform(
-          'multi', None, expansion_service.ExpansionServiceServicer())
-      assert_that(res['main'], equal_to(['as', 'bbs', 'xs', 'yys', 'zzzs']))
-      assert_that(res['side'], equal_to(['ss']), label='CheckSide')
-
-  def test_payload(self):
-
-    @ptransform.PTransform.register_urn('payload', bytes)
-    class PayloadTransform(ptransform.PTransform):
-      def __init__(self, payload):
-        self._payload = payload
-
-      def expand(self, pcoll):
-        return pcoll | beam.Map(lambda x, s: x + s, self._payload)
-
-      def to_runner_api_parameter(self, unused_context):
-        return b'payload', self._payload.encode('ascii')
-
-      @staticmethod
-      def from_runner_api_parameter(payload, unused_context):
-        return PayloadTransform(payload.decode('ascii'))
-
-    with beam.Pipeline() as p:
-      res = (
-          p
-          | beam.Create(['a', 'bb'], reshuffle=False)
-          | beam.ExternalTransform(
-              'payload', b's',
-              expansion_service.ExpansionServiceServicer()))
-      assert_that(res, equal_to(['as', 'bbs']))
-
-  def test_nested(self):
-    @ptransform.PTransform.register_urn('fib', bytes)
-    class FibTransform(ptransform.PTransform):
-      def __init__(self, level):
-        self._level = level
-
-      def expand(self, p):
-        if self._level <= 2:
-          return p | beam.Create([1])
-        else:
-          a = p | 'A' >> beam.ExternalTransform(
-              'fib', str(self._level - 1).encode('ascii'),
-              expansion_service.ExpansionServiceServicer())
-          b = p | 'B' >> beam.ExternalTransform(
-              'fib', str(self._level - 2).encode('ascii'),
-              expansion_service.ExpansionServiceServicer())
-          return (
-              (a, b)
-              | beam.Flatten()
-              | beam.CombineGlobally(sum).without_defaults())
-
-      def to_runner_api_parameter(self, unused_context):
-        return 'fib', str(self._level).encode('ascii')
-
-      @staticmethod
-      def from_runner_api_parameter(level, unused_context):
-        return FibTransform(int(level.decode('ascii')))
-
-    with beam.Pipeline() as p:
-      assert_that(p | FibTransform(6), equal_to([8]))
-
-  def test_java_expansion(self):
-    if not self.expansion_service_jar:
-      raise unittest.SkipTest('No expansion service jar provided.')
-
-    # The actual definitions of these transforms is in
-    # org.apache.beam.runners.core.construction.TestExpansionService.
-    TEST_COUNT_URN = "pytest:beam:transforms:count"
-    TEST_FILTER_URN = "pytest:beam:transforms:filter_less_than"
-
-    # Run as cheaply as possible on the portable runner.
-    # TODO(robertwb): Support this directly in the direct runner.
-    options = beam.options.pipeline_options.PipelineOptions(
-        runner='PortableRunner',
-        experiments=['beam_fn_api'],
-        environment_type=python_urns.EMBEDDED_PYTHON,
-        job_endpoint='embed')
-
-    try:
-      # Start the java server and wait for it to be ready.
-      port = '8091'
-      address = 'localhost:%s' % port
-      server = subprocess.Popen(
-          ['java', '-jar', self.expansion_service_jar, port])
-      with grpc.insecure_channel(address) as channel:
-        grpc.channel_ready_future(channel).result()
-
-      # Run a simple count-filtered-letters pipeline.
-      with beam.Pipeline(options=options) as p:
-        res = (
-            p
-            | beam.Create(list('aaabccxyyzzz'))
-            | beam.Map(unicode)
-            | beam.ExternalTransform(TEST_FILTER_URN, 'middle', address)
-            | beam.ExternalTransform(TEST_COUNT_URN, None, address)
-            | beam.Map(lambda kv: '%s: %s' % kv))
-
-        assert_that(res, equal_to(['a: 3', 'b: 1', 'c: 2']))
-
-    finally:
-      server.kill()
-
-
-if __name__ == '__main__':
-  parser = argparse.ArgumentParser()
-  parser.add_argument('--expansion_service_jar')
-  known_args, sys.argv = parser.parse_known_args(sys.argv)
-
-  if known_args.expansion_service_jar:
-    ExternalTransformTest.expansion_service_jar = (
-        known_args.expansion_service_jar)
-
-  unittest.main()
diff --git a/sdks/python/apache_beam/transforms/ptransform.py 
b/sdks/python/apache_beam/transforms/ptransform.py
index 2ebc93b..c512d9f 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -36,7 +36,6 @@ FlatMap processing functions.
 
 from __future__ import absolute_import
 
-import contextlib
 import copy
 import itertools
 import operator
@@ -231,23 +230,6 @@ def get_nested_pvalues(pvalueish):
   return pvalues
 
 
-def get_nested_pvalues0(pvalueish):
-  if isinstance(pvalueish, (tuple, list)):
-    tagged_values = enumerate(pvalueish)
-  if isinstance(pvalueish, dict):
-    tagged_values = pvalueish.items()
-  else:
-    yield None, pvalueish
-    return
-
-  for tag, subvalue in tagged_values:
-    for subtag, subsubvalue in get_nested_pvalues(subvalue):
-      if subtag is None:
-        yield tag, subsubvalue
-      else:
-        yield '%s.%s' % (tag, subsubvalue), subsubvalue
-
-
 class _ZipPValues(object):
   """Pairs each PValue in a pvalueish with a value in a parallel out sibling.
 
@@ -544,37 +526,13 @@ class PTransform(WithTypeHints, HasDisplayData):
         yield pvalueish
     return pvalueish, tuple(_dict_tuple_leaves(pvalueish))
 
-  def _pvaluish_from_dict(self, input_dict):
-    if len(input_dict) == 1:
-      return next(iter(input_dict.values()))
-    else:
-      return input_dict
-
   _known_urns = {}
 
   @classmethod
   def register_urn(cls, urn, parameter_type, constructor=None):
     def register(constructor):
-      if isinstance(constructor, type):
-        constructor.from_runner_api_parameter = register(
-            constructor.from_runner_api_parameter)
-        # pylint isn't smart enough to recognize when this is used
-        # on a class or a method, and will emit a no-self-warning
-        # in the latter case.  Rather than suppressing this at each
-        # use, we fool it here through some dynamic patching that
-        # pylint will also not understand.
-
-        @contextlib.contextmanager
-        def fake_static_method():
-          actual_static_method = staticmethod
-          globals()['staticmethod'] = lambda x: x
-          yield
-          globals()['staticmethod'] = actual_static_method
-        with fake_static_method():
-          return staticmethod(constructor)
-      else:
-        cls._known_urns[urn] = parameter_type, constructor
-        return staticmethod(constructor)
+      cls._known_urns[urn] = parameter_type, constructor
+      return staticmethod(constructor)
     if constructor:
       # Used as a statement.
       register(constructor)
diff --git a/sdks/python/build.gradle b/sdks/python/build.gradle
index 18bd161..fc2372e 100644
--- a/sdks/python/build.gradle
+++ b/sdks/python/build.gradle
@@ -430,15 +430,3 @@ project.task('createProcessWorker') {
     }
   }
 }
-
-project.task('crossLanguagePythonJava') {
-  dependsOn 'setupVirtualenv'
-  dependsOn ':beam-sdks-java-container:docker'
-  dependsOn ':beam-runners-core-construction-java:testExpansionService'
-  doLast {
-    exec {
-      executable 'sh'
-      args '-c', ". ${project.ext.envdir}/bin/activate && pip install -e 
.[test] && python -m apache_beam.transforms.external_test 
--expansion_service_jar=${project(":beam-runners-core-construction-java:").testExpansionService.archivePath}"
-    }
-  }
-}

Reply via email to