This is an automated email from the ASF dual-hosted git repository. kenn pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new a65f1c4 Revert "Merge pull request #7316 [BEAM-6269] Cross-SDK transform expansion protocol." new d9a1bac Merge pull request #7663: [BEAM-6539] Revert "Merge pull request #7316 [BEAM-6269] Cross-SDK transform expansion protocol" a65f1c4 is described below commit a65f1c43b7fbedfec3e7396d9e024bf0e762c33d Author: Daniel Oliveira <daniel.o.program...@gmail.com> AuthorDate: Tue Jan 29 11:01:02 2019 -0800 Revert "Merge pull request #7316 [BEAM-6269] Cross-SDK transform expansion protocol." This reverts commit 26bd104f122986429ba5a8abd583b04a44a8edee, reversing changes made to dcda09a8725d30467c4eb549985e3fe979208759. --- model/job-management/build.gradle | 5 +- .../src/main/proto/beam_expansion_api.proto | 68 ------- .../pipeline/src/main/proto/beam_runner_api.proto | 3 - runners/core-construction-java/build.gradle | 18 -- .../core/construction/ExpansionService.java | 205 -------------------- .../core/construction/ModelCoderRegistrar.java | 3 - .../runners/core/construction/ModelCoders.java | 2 - .../core/construction/RehydratedComponents.java | 11 -- .../runners/core/construction/SdkComponents.java | 57 +----- .../core/construction/CoderTranslationTest.java | 1 - .../core/construction/ExpansionServiceTest.java | 103 ---------- .../core/construction/TestExpansionService.java | 52 ------ .../fnexecution/control/RemoteExecutionTest.java | 32 +++- .../beam/sdk/runners/TransformHierarchy.java | 28 +-- sdks/python/apache_beam/coders/coders.py | 36 ---- sdks/python/apache_beam/pipeline.py | 30 ++- sdks/python/apache_beam/pvalue.py | 12 +- .../python/apache_beam/runners/pipeline_context.py | 34 +--- .../apache_beam/runners/pipeline_context_test.py | 8 - .../runners/portability/expansion_service.py | 118 ------------ sdks/python/apache_beam/transforms/__init__.py | 1 - sdks/python/apache_beam/transforms/external.py | 208 --------------------- .../python/apache_beam/transforms/external_test.py | 205 -------------------- sdks/python/apache_beam/transforms/ptransform.py | 46 +---- sdks/python/build.gradle | 12 -- 25 files changed, 65 insertions(+), 1233 deletions(-) diff --git a/model/job-management/build.gradle b/model/job-management/build.gradle index 4f81152..4c50782 100644 --- a/model/job-management/build.gradle +++ b/model/job-management/build.gradle @@ -17,10 +17,7 @@ */ apply plugin: org.apache.beam.gradle.BeamModulePlugin -applyPortabilityNature(shadowJarValidationExcludes:[ - "org/apache/beam/model/expansion/v1/**", - "org/apache/beam/model/jobmanagement/v1/**", -]) +applyPortabilityNature(shadowJarValidationExcludes: ["org/apache/beam/model/jobmanagement/v1/**"]) description = "Apache Beam :: Model :: Job Management" ext.summary = "Portable definitions for submitting pipelines." diff --git a/model/job-management/src/main/proto/beam_expansion_api.proto b/model/job-management/src/main/proto/beam_expansion_api.proto deleted file mode 100644 index 92b0dd2..0000000 --- a/model/job-management/src/main/proto/beam_expansion_api.proto +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/* - * Protocol Buffers describing the Expansion API, an api for expanding - * transforms in a remote SDK. - */ - -syntax = "proto3"; - -package org.apache.beam.model.expansion.v1; - -option go_package = "construction_v1"; -option java_package = "org.apache.beam.model.expansion.v1"; -option java_outer_classname = "ExpansionApi"; - -import "beam_runner_api.proto"; - -message ExpansionRequest { - // Set of components needed to interpret the transform, or which - // may be useful for its expansion. This includes the input - // PCollections (if any) to the to-be-expanded transform, along - // with their coders and windowing strategies. - org.apache.beam.model.pipeline.v1.Components components = 1; - - // The actual PTransform to be expaneded according to its spec. - // Its input should be set, but its subtransforms and outputs - // should not be. - org.apache.beam.model.pipeline.v1.PTransform transform = 2; - - // A namespace (prefix) to use for the id of any newly created - // components. - string namespace = 3; -} - -message ExpansionResponse { - // Set of components needed to execute the expanded transform, - // including the (original) inputs, outputs, and subtransforms. - org.apache.beam.model.pipeline.v1.Components components = 1; - - // The expanded transform itself, with references to its outputs - // and subtransforms. - org.apache.beam.model.pipeline.v1.PTransform transform = 2; - - // (Optional) An string representation of any error encountered while - // attempting to expand this transform. - string error = 10; -} - -// Job Service for constructing pipelines -service ExpansionService { - rpc Expand (ExpansionRequest) returns (ExpansionResponse); -} diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto index e081f07..42a1970 100644 --- a/model/pipeline/src/main/proto/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/beam_runner_api.proto @@ -544,9 +544,6 @@ message StandardCoders { // Components: None BYTES = 0 [(beam_urn) = "beam:coder:bytes:v1"]; - // Components: None - STRING_UTF8 = 10 [(beam_urn) = "beam:coder:string_utf8:v1"]; - // Components: The key and value coder, in that order. KV = 1 [(beam_urn) = "beam:coder:kv:v1"]; diff --git a/runners/core-construction-java/build.gradle b/runners/core-construction-java/build.gradle index 7ac38e0..365373c 100644 --- a/runners/core-construction-java/build.gradle +++ b/runners/core-construction-java/build.gradle @@ -50,21 +50,3 @@ dependencies { shadowTest library.java.jackson_dataformat_yaml shadowTest project(path: ":beam-model-fn-execution", configuration: "shadow") } - -task runExpansionService (type: JavaExec) { - main = "org.apache.beam.runners.core.construction.ExpansionService" - classpath = sourceSets.main.runtimeClasspath - args = [project.findProperty("constructionService.port") ?: "8097"] -} - -task testExpansionService(type: Jar) { - dependsOn = [shadowJar, shadowTestJar] - manifest { - attributes( - 'Main-Class': 'org.apache.beam.runners.core.construction.TestExpansionService' - ) - } - from { configurations.testRuntime.collect { it.isDirectory() ? it : zipTree(it) }} - from sourceSets.main.output - from sourceSets.test.output -} diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExpansionService.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExpansionService.java deleted file mode 100644 index a76bc48..0000000 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExpansionService.java +++ /dev/null @@ -1,205 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core.construction; - -import com.google.auto.service.AutoService; -import java.io.IOException; -import java.util.Collections; -import java.util.Map; -import java.util.ServiceLoader; -import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.IntStream; -import org.apache.beam.model.expansion.v1.ExpansionApi; -import org.apache.beam.model.expansion.v1.ExpansionServiceGrpc; -import org.apache.beam.model.pipeline.v1.RunnerApi; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionList; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.PDone; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Server; -import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ServerBuilder; -import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver; -import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; -import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** A service that allows pipeline expand transforms from a remote SDK. */ -public class ExpansionService extends ExpansionServiceGrpc.ExpansionServiceImplBase { - - private static final Logger LOG = LoggerFactory.getLogger(ExpansionService.class); - - /** - * A registrar that creates {@link TransformProvider} instances from {@link - * RunnerApi.FunctionSpec}s. - * - * <p>Transform authors have the ability to provide a registrar by creating a {@link - * ServiceLoader} entry and a concrete implementation of this interface. - * - * <p>It is optional but recommended to use one of the many build time tools such as {@link - * AutoService} to generate the necessary META-INF files automatically. - */ - public interface ExpansionServiceRegistrar { - Map<String, TransformProvider> knownTransforms(); - } - - /** - * Provides a mapping of {@link RunnerApi.FunctionSpec} to a {@link PTransform}, together with - * mappings of its inputs and outputs to maps of PCollections. - * - * @param <InputT> input {@link PValue} type of the transform - * @param <OutputT> output {@link PValue} type of the transform - */ - public interface TransformProvider<InputT extends PValue, OutputT extends PValue> { - - default InputT createInput(Pipeline p, Map<String, PCollection<?>> inputs) { - if (inputs.size() == 0) { - return (InputT) p.begin(); - } - if (inputs.size() == 1) { - return (InputT) Iterables.getOnlyElement(inputs.values()); - } else { - PCollectionTuple inputTuple = PCollectionTuple.empty(p); - for (Map.Entry<String, PCollection<?>> entry : inputs.entrySet()) { - inputTuple = inputTuple.and(new TupleTag(entry.getKey()), entry.getValue()); - } - return (InputT) inputTuple; - } - } - - PTransform<InputT, OutputT> getTransform(RunnerApi.FunctionSpec spec); - - default Map<String, PCollection<?>> extractOutputs(OutputT output) { - if (output instanceof PDone) { - return Collections.emptyMap(); - } else if (output instanceof PCollection) { - return ImmutableMap.of("output", (PCollection<?>) output); - } else if (output instanceof PCollectionTuple) { - return ((PCollectionTuple) output) - .getAll().entrySet().stream() - .collect(Collectors.toMap(entry -> entry.getKey().toString(), Map.Entry::getValue)); - } else if (output instanceof PCollectionList<?>) { - PCollectionList<?> listOutput = (PCollectionList<?>) output; - return IntStream.range(0, listOutput.size()) - .boxed() - .collect(Collectors.toMap(index -> "output_" + index, listOutput::get)); - } else { - throw new UnsupportedOperationException("Unknown output type: " + output.getClass()); - } - } - - default Map<String, PCollection<?>> apply( - Pipeline p, String name, RunnerApi.FunctionSpec spec, Map<String, PCollection<?>> inputs) { - return extractOutputs( - Pipeline.applyTransform(name, createInput(p, inputs), getTransform(spec))); - } - } - - private Map<String, TransformProvider> registeredTransforms = loadRegisteredTransforms(); - - private Map<String, TransformProvider> loadRegisteredTransforms() { - ImmutableMap.Builder<String, TransformProvider> registeredTransforms = ImmutableMap.builder(); - for (ExpansionServiceRegistrar registrar : - ServiceLoader.load(ExpansionServiceRegistrar.class)) { - registeredTransforms.putAll(registrar.knownTransforms()); - } - return registeredTransforms.build(); - } - - @VisibleForTesting - /*package*/ ExpansionApi.ExpansionResponse expand(ExpansionApi.ExpansionRequest request) { - LOG.info( - "Expanding '{}' with URN '{}'", - request.getTransform().getUniqueName(), - request.getTransform().getSpec().getUrn()); - LOG.debug("Full transform: {}", request.getTransform()); - Set<String> existingTransformIds = request.getComponents().getTransformsMap().keySet(); - Pipeline pipeline = Pipeline.create(); - RehydratedComponents rehydratedComponents = - RehydratedComponents.forComponents(request.getComponents()).withPipeline(pipeline); - - Map<String, PCollection<?>> inputs = - request.getTransform().getInputsMap().entrySet().stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, - input -> { - try { - return rehydratedComponents.getPCollection(input.getValue()); - } catch (IOException exn) { - throw new RuntimeException(exn); - } - })); - if (!registeredTransforms.containsKey(request.getTransform().getSpec().getUrn())) { - throw new UnsupportedOperationException( - "Unknown urn: " + request.getTransform().getSpec().getUrn()); - } - registeredTransforms - .get(request.getTransform().getSpec().getUrn()) - .apply( - pipeline, - request.getTransform().getUniqueName(), - request.getTransform().getSpec(), - inputs); - - // Needed to find which transform was new... - SdkComponents sdkComponents = - rehydratedComponents.getSdkComponents().withNewIdPrefix(request.getNamespace()); - sdkComponents.registerEnvironment(Environments.JAVA_SDK_HARNESS_ENVIRONMENT); - RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents); - String expandedTransformId = - Iterables.getOnlyElement( - pipelineProto.getRootTransformIdsList().stream() - .filter(id -> !existingTransformIds.contains(id)) - .collect(Collectors.toList())); - RunnerApi.Components components = pipelineProto.getComponents(); - LOG.debug("Expanded to {}", components.getTransformsOrThrow(expandedTransformId)); - - return ExpansionApi.ExpansionResponse.newBuilder() - .setComponents(components.toBuilder().removeTransforms(expandedTransformId)) - .setTransform(components.getTransformsOrThrow(expandedTransformId)) - .build(); - } - - @Override - public void expand( - ExpansionApi.ExpansionRequest request, - StreamObserver<ExpansionApi.ExpansionResponse> responseObserver) { - try { - responseObserver.onNext(expand(request)); - responseObserver.onCompleted(); - } catch (RuntimeException exn) { - responseObserver.onError(exn); - throw exn; - } - } - - public static void main(String[] args) throws Exception { - int port = Integer.parseInt(args[0]); - System.out.println("Starting expansion service at localhost:" + port); - Server server = ServerBuilder.forPort(port).addService(new ExpansionService()).build(); - server.start(); - server.awaitTermination(); - } -} diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java index d55f5d4..8843125 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java @@ -27,7 +27,6 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.LengthPrefixCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder; @@ -47,7 +46,6 @@ public class ModelCoderRegistrar implements CoderTranslatorRegistrar { static final BiMap<Class<? extends Coder>, String> BEAM_MODEL_CODER_URNS = ImmutableBiMap.<Class<? extends Coder>, String>builder() .put(ByteArrayCoder.class, ModelCoders.BYTES_CODER_URN) - .put(StringUtf8Coder.class, ModelCoders.STRING_UTF8_CODER_URN) .put(KvCoder.class, ModelCoders.KV_CODER_URN) .put(VarLongCoder.class, ModelCoders.INT64_CODER_URN) .put(IntervalWindowCoder.class, ModelCoders.INTERVAL_WINDOW_CODER_URN) @@ -64,7 +62,6 @@ public class ModelCoderRegistrar implements CoderTranslatorRegistrar { static final Map<Class<? extends Coder>, CoderTranslator<? extends Coder>> BEAM_MODEL_CODERS = ImmutableMap.<Class<? extends Coder>, CoderTranslator<? extends Coder>>builder() .put(ByteArrayCoder.class, CoderTranslators.atomic(ByteArrayCoder.class)) - .put(StringUtf8Coder.class, CoderTranslators.atomic(StringUtf8Coder.class)) .put(VarLongCoder.class, CoderTranslators.atomic(VarLongCoder.class)) .put(IntervalWindowCoder.class, CoderTranslators.atomic(IntervalWindowCoder.class)) .put(GlobalWindow.Coder.class, CoderTranslators.atomic(GlobalWindow.Coder.class)) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java index b79ee35..3c6dfba 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java @@ -33,7 +33,6 @@ public class ModelCoders { private ModelCoders() {} public static final String BYTES_CODER_URN = getUrn(StandardCoders.Enum.BYTES); - public static final String STRING_UTF8_CODER_URN = getUrn(StandardCoders.Enum.STRING_UTF8); // Where is this required explicitly, instead of implicit within WindowedValue and LengthPrefix // coders? public static final String INT64_CODER_URN = getUrn(StandardCoders.Enum.VARINT); @@ -55,7 +54,6 @@ public class ModelCoders { private static final Set<String> MODEL_CODER_URNS = ImmutableSet.of( BYTES_CODER_URN, - STRING_UTF8_CODER_URN, INT64_CODER_URN, ITERABLE_CODER_URN, TIMER_CODER_URN, diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java index 6265894..68a793a 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.core.construction; import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState; import java.io.IOException; -import java.util.Collections; import java.util.concurrent.ExecutionException; import javax.annotation.Nullable; import org.apache.beam.model.pipeline.v1.RunnerApi; @@ -174,14 +173,4 @@ public class RehydratedComponents { public Components getComponents() { return components; } - - public SdkComponents getSdkComponents() { - return SdkComponents.create( - components, - Collections.emptyMap(), - pCollections.asMap(), - windowingStrategies.asMap(), - coders.asMap(), - Collections.emptyMap()); - } } diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java index 2a1b335..e44d724 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java @@ -23,7 +23,6 @@ import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Precondi import java.io.IOException; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.RunnerApi.Components; @@ -43,20 +42,22 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables; /** SDK objects that will be represented at some later point within a {@link Components} object. */ public class SdkComponents { - private final String newIdPrefix; private final RunnerApi.Components.Builder componentsBuilder = RunnerApi.Components.newBuilder(); private final BiMap<AppliedPTransform<?, ?, ?>, String> transformIds = HashBiMap.create(); private final BiMap<PCollection<?>, String> pCollectionIds = HashBiMap.create(); private final BiMap<WindowingStrategy<?, ?>, String> windowingStrategyIds = HashBiMap.create(); + + /** A map of Coder to IDs. Coders are stored here with identity equivalence. */ private final BiMap<Coder<?>, String> coderIds = HashBiMap.create(); + private final BiMap<Environment, String> environmentIds = HashBiMap.create(); private final Set<String> reservedIds = new HashSet<>(); /** Create a new {@link SdkComponents} with no components. */ public static SdkComponents create() { - return new SdkComponents(""); + return new SdkComponents(); } /** @@ -65,27 +66,11 @@ public class SdkComponents { * <p>WARNING: This action might cause some of duplicate items created. */ public static SdkComponents create(RunnerApi.Components components) { - return new SdkComponents(components, ""); - } - - /*package*/ static SdkComponents create( - RunnerApi.Components components, - Map<String, AppliedPTransform<?, ?, ?>> transforms, - Map<String, PCollection<?>> pCollections, - Map<String, WindowingStrategy<?, ?>> windowingStrategies, - Map<String, Coder<?>> coders, - Map<String, Environment> environments) { - SdkComponents sdkComponents = SdkComponents.create(components); - sdkComponents.transformIds.inverse().putAll(transforms); - sdkComponents.pCollectionIds.inverse().putAll(pCollections); - sdkComponents.windowingStrategyIds.inverse().putAll(windowingStrategies); - sdkComponents.coderIds.inverse().putAll(coders); - sdkComponents.environmentIds.inverse().putAll(environments); - return sdkComponents; + return new SdkComponents(components); } public static SdkComponents create(PipelineOptions options) { - SdkComponents sdkComponents = new SdkComponents(""); + SdkComponents sdkComponents = new SdkComponents(); PortablePipelineOptions portablePipelineOptions = options.as(PortablePipelineOptions.class); sdkComponents.registerEnvironment( Environments.createOrGetDefaultEnvironment( @@ -94,13 +79,9 @@ public class SdkComponents { return sdkComponents; } - private SdkComponents(String newIdPrefix) { - this.newIdPrefix = newIdPrefix; - } - - private SdkComponents(RunnerApi.Components components, String newIdPrefix) { - this.newIdPrefix = newIdPrefix; + private SdkComponents() {} + private SdkComponents(RunnerApi.Components components) { if (components == null) { return; } @@ -111,28 +92,10 @@ public class SdkComponents { reservedIds.addAll(components.getCodersMap().keySet()); reservedIds.addAll(components.getEnvironmentsMap().keySet()); - environmentIds.inverse().putAll(components.getEnvironmentsMap()); - componentsBuilder.mergeFrom(components); } /** - * Returns an SdkComponents like this one, but which will prefix all newly generated ids with the - * given string. - * - * <p>Useful for ensuring independently-constructed components have non-overlapping ids. - */ - public SdkComponents withNewIdPrefix(String newIdPrefix) { - SdkComponents sdkComponents = new SdkComponents(componentsBuilder.build(), newIdPrefix); - sdkComponents.transformIds.putAll(transformIds); - sdkComponents.pCollectionIds.putAll(pCollectionIds); - sdkComponents.windowingStrategyIds.putAll(windowingStrategyIds); - sdkComponents.coderIds.putAll(coderIds); - sdkComponents.environmentIds.putAll(environmentIds); - return sdkComponents; - } - - /** * Registers the provided {@link AppliedPTransform} into this {@link SdkComponents}, returning a * unique ID for the {@link AppliedPTransform}. Multiple registrations of the same {@link * AppliedPTransform} will return the same unique ID. @@ -274,10 +237,10 @@ public class SdkComponents { } private String uniqify(String baseName, Set<String> existing) { - String name = newIdPrefix + baseName; + String name = baseName; int increment = 1; while (existing.contains(name) || reservedIds.contains(name)) { - name = newIdPrefix + baseName + Integer.toString(increment); + name = baseName + Integer.toString(increment); increment++; } return name; diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java index 593f9a8..f96c977 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java @@ -60,7 +60,6 @@ public class CoderTranslationTest { private static final Set<StructuredCoder<?>> KNOWN_CODERS = ImmutableSet.<StructuredCoder<?>>builder() .add(ByteArrayCoder.of()) - .add(StringUtf8Coder.of()) .add(KvCoder.of(VarLongCoder.of(), VarLongCoder.of())) .add(VarLongCoder.of()) .add(IntervalWindowCoder.of()) diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ExpansionServiceTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ExpansionServiceTest.java deleted file mode 100644 index 93af15d..0000000 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ExpansionServiceTest.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core.construction; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertTrue; - -import com.google.auto.service.AutoService; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import org.apache.beam.model.expansion.v1.ExpansionApi; -import org.apache.beam.model.pipeline.v1.RunnerApi; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.Impulse; -import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; -import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables; -import org.junit.Test; - -/** Tests for {@link ExpansionService}. */ -public class ExpansionServiceTest { - - private static final String TEST_URN = "test:beam:transforms:count"; - - private static final String TEST_NAME = "TestName"; - - private static final String TEST_NAMESPACE = "namespace"; - - private ExpansionService expansionService = new ExpansionService(); - - /** Registers a single test transformation. */ - @AutoService(ExpansionService.ExpansionServiceRegistrar.class) - public static class TestTransforms implements ExpansionService.ExpansionServiceRegistrar { - @Override - public Map<String, ExpansionService.TransformProvider> knownTransforms() { - return ImmutableMap.of(TEST_URN, spec -> Count.perElement()); - } - } - - @Test - public void testConstruct() { - Pipeline p = Pipeline.create(); - p.apply(Impulse.create()); - RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p); - String inputPcollId = - Iterables.getOnlyElement( - Iterables.getOnlyElement(pipelineProto.getComponents().getTransformsMap().values()) - .getOutputsMap() - .values()); - ExpansionApi.ExpansionRequest request = - ExpansionApi.ExpansionRequest.newBuilder() - .setComponents(pipelineProto.getComponents()) - .setTransform( - RunnerApi.PTransform.newBuilder() - .setUniqueName(TEST_NAME) - .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(TEST_URN)) - .putInputs("input", inputPcollId)) - .setNamespace(TEST_NAMESPACE) - .build(); - ExpansionApi.ExpansionResponse response = expansionService.expand(request); - RunnerApi.PTransform expandedTransform = response.getTransform(); - assertEquals(TEST_NAME, expandedTransform.getUniqueName()); - // Verify it has the right input. - assertEquals(inputPcollId, Iterables.getOnlyElement(expandedTransform.getInputsMap().values())); - // Loose check that it's composite, and its children are represented. - assertNotEquals(expandedTransform.getSubtransformsCount(), 0); - for (String subtransform : expandedTransform.getSubtransformsList()) { - assertTrue(response.getComponents().containsTransforms(subtransform)); - } - // Check that any newly generated components are properly namespaced. - Set<String> originalIds = allIds(request.getComponents()); - for (String id : allIds(response.getComponents())) { - assertTrue(id, id.startsWith(TEST_NAMESPACE) || originalIds.contains(id)); - } - } - - public Set<String> allIds(RunnerApi.Components components) { - Set<String> all = new HashSet<>(); - all.addAll(components.getTransformsMap().keySet()); - all.addAll(components.getPcollectionsMap().keySet()); - all.addAll(components.getCodersMap().keySet()); - all.addAll(components.getWindowingStrategiesMap().keySet()); - all.addAll(components.getEnvironmentsMap().keySet()); - return all; - } -} diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestExpansionService.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestExpansionService.java deleted file mode 100644 index 983aa02..0000000 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestExpansionService.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core.construction; - -import com.google.auto.service.AutoService; -import java.util.Map; -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.Filter; -import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Server; -import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ServerBuilder; -import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; - -/** An {@link ExpansionService} useful for tests. */ -public class TestExpansionService { - - private static final String TEST_COUNT_URN = "pytest:beam:transforms:count"; - private static final String TEST_FILTER_URN = "pytest:beam:transforms:filter_less_than"; - - /** Registers a single test transformation. */ - @AutoService(ExpansionService.ExpansionServiceRegistrar.class) - public static class TestTransforms implements ExpansionService.ExpansionServiceRegistrar { - @Override - public Map<String, ExpansionService.TransformProvider> knownTransforms() { - return ImmutableMap.of( - TEST_COUNT_URN, spec -> Count.perElement(), - TEST_FILTER_URN, spec -> Filter.lessThanEq(spec.getPayload().toStringUtf8())); - } - } - - public static void main(String[] args) throws Exception { - int port = Integer.parseInt(args[0]); - System.out.println("Starting expansion service at localhost:" + port); - Server server = ServerBuilder.forPort(port).addService(new ExpansionService()).build(); - server.start(); - server.awaitTermination(); - } -} diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java index 80e5a56..09cf59d 100644 --- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java @@ -442,7 +442,11 @@ public class RemoteExecutionTest implements Serializable { RemoteOutputReceiver.of(targetCoder.getValue(), outputContents::add)); } - Iterable<String> sideInputData = Arrays.asList("A", "B", "C"); + Iterable<byte[]> sideInputData = + Arrays.asList( + CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "A"), + CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "B"), + CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "C")); StateRequestHandler stateRequestHandler = StateRequestHandlers.forSideInputHandlerFactory( descriptor.getSideInputSpecs(), @@ -472,9 +476,13 @@ public class RemoteExecutionTest implements Serializable { try (ActiveBundle bundle = processor.newBundle(outputReceivers, stateRequestHandler, progressHandler)) { Iterables.getOnlyElement(bundle.getInputReceivers().values()) - .accept(WindowedValue.valueInGlobalWindow("X")); + .accept( + WindowedValue.valueInGlobalWindow( + CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "X"))); Iterables.getOnlyElement(bundle.getInputReceivers().values()) - .accept(WindowedValue.valueInGlobalWindow("Y")); + .accept( + WindowedValue.valueInGlobalWindow( + CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "Y"))); } for (Collection<WindowedValue<?>> windowedValues : outputValues.values()) { assertThat( @@ -1070,18 +1078,22 @@ public class RemoteExecutionTest implements Serializable { WindowedValue.valueInGlobalWindow(kvBytes("stream2X", "")))); } - private KV<String, byte[]> kvBytes(String key, long value) throws CoderException { - return KV.of(key, CoderUtils.encodeToByteArray(BigEndianLongCoder.of(), value)); + private KV<byte[], byte[]> kvBytes(String key, long value) throws CoderException { + return KV.of( + CoderUtils.encodeToByteArray(StringUtf8Coder.of(), key), + CoderUtils.encodeToByteArray(BigEndianLongCoder.of(), value)); } - private KV<String, String> kvBytes(String key, String value) throws CoderException { - return KV.of(key, value); + private KV<byte[], byte[]> kvBytes(String key, String value) throws CoderException { + return KV.of( + CoderUtils.encodeToByteArray(StringUtf8Coder.of(), key), + CoderUtils.encodeToByteArray(StringUtf8Coder.of(), value)); } - private KV<String, org.apache.beam.runners.core.construction.Timer<byte[]>> timerBytes( + private KV<byte[], org.apache.beam.runners.core.construction.Timer<byte[]>> timerBytes( String key, long timestampOffset) throws CoderException { return KV.of( - key, + CoderUtils.encodeToByteArray(StringUtf8Coder.of(), key), org.apache.beam.runners.core.construction.Timer.of( BoundedWindow.TIMESTAMP_MIN_VALUE.plus(timestampOffset), CoderUtils.encodeToByteArray(VoidCoder.of(), null, Coder.Context.NESTED))); @@ -1090,7 +1102,7 @@ public class RemoteExecutionTest implements Serializable { private Object timerStructuralValue(Object timer) { return WindowedValue.FullWindowedValueCoder.of( KvCoder.of( - StringUtf8Coder.of(), + ByteArrayCoder.of(), org.apache.beam.runners.core.construction.Timer.Coder.of(ByteArrayCoder.of())), GlobalWindow.Coder.INSTANCE) .structuralValue(timer); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java index 5468537..a31eced 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java @@ -185,11 +185,9 @@ public class TransformHierarchy { public void finishSpecifyingInput() { // Inputs must be completely specified before they are consumed by a transform. for (PValue inputValue : current.getInputs().values()) { + Node producerNode = getProducer(inputValue); PInput input = producerInput.remove(inputValue); - Node producerNode = maybeGetProducer(inputValue); - if (producerNode != null) { - inputValue.finishSpecifying(input, producerNode.getTransform()); - } + inputValue.finishSpecifying(input, producerNode.getTransform()); } } @@ -237,12 +235,8 @@ public class TransformHierarchy { checkState(current != null, "Can't pop the root node of a TransformHierarchy"); } - Node maybeGetProducer(PValue produced) { - return producers.get(produced); - } - Node getProducer(PValue produced) { - return checkNotNull(maybeGetProducer(produced), "No producer found for %s", produced); + return checkNotNull(producers.get(produced), "No producer found for %s", produced); } public Set<PValue> visit(PipelineVisitor visitor) { @@ -635,15 +629,13 @@ public class TransformHierarchy { if (!isRootNode()) { // Visit inputs. for (PValue inputValue : inputs.values()) { - Node valueProducer = maybeGetProducer(inputValue); - if (valueProducer != null) { - if (!visitedNodes.contains(valueProducer)) { - valueProducer.visit(visitor, visitedValues, visitedNodes, skippedComposites); - } - if (visitedValues.add(inputValue)) { - LOG.debug("Visiting input value {}", inputValue); - visitor.visitValue(inputValue, valueProducer); - } + Node valueProducer = getProducer(inputValue); + if (!visitedNodes.contains(valueProducer)) { + valueProducer.visit(visitor, visitedValues, visitedNodes, skippedComposites); + } + if (visitedValues.add(inputValue)) { + LOG.debug("Visiting input value {}", inputValue); + visitor.visitValue(inputValue, valueProducer); } } } diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index 2164161..baa3cf5 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -27,13 +27,11 @@ from builtins import object import google.protobuf.wrappers_pb2 from future.moves import pickle -from past.builtins import unicode from apache_beam.coders import coder_impl from apache_beam.portability import common_urns from apache_beam.portability import python_urns from apache_beam.portability.api import beam_runner_api_pb2 -from apache_beam.typehints import typehints from apache_beam.utils import proto_utils # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports @@ -164,9 +162,6 @@ class Coder(object): return d return self.__dict__ - def to_type_hint(self): - raise NotImplementedError('BEAM-2717') - @classmethod def from_type_hint(cls, unused_typehint, unused_registry): # If not overridden, just construct the coder without arguments. @@ -326,13 +321,6 @@ class StrUtf8Coder(Coder): def is_deterministic(self): return True - def to_type_hint(self): - return unicode - - -Coder.register_structured_urn( - common_urns.coders.STRING_UTF8.urn, StrUtf8Coder) - class ToStringCoder(Coder): """A default string coder used if no sink coder is specified.""" @@ -388,9 +376,6 @@ class BytesCoder(FastCoder): def is_deterministic(self): return True - def to_type_hint(self): - return bytes - def as_cloud_object(self, coders_context=None): return { '@type': 'kind:bytes', @@ -415,9 +400,6 @@ class VarIntCoder(FastCoder): def is_deterministic(self): return True - def to_type_hint(self): - return int - def as_cloud_object(self, coders_context=None): return { '@type': 'kind:varint', @@ -442,9 +424,6 @@ class FloatCoder(FastCoder): def is_deterministic(self): return True - def to_type_hint(self): - return float - def __eq__(self, other): return type(self) == type(other) @@ -589,9 +568,6 @@ class PickleCoder(_PickleCoderBase): def as_deterministic_coder(self, step_label, error_message=None): return DeterministicFastPrimitivesCoder(self, step_label) - def to_type_hint(self): - return typehints.Any - class DillCoder(_PickleCoderBase): """Coder using dill's pickle functionality.""" @@ -623,9 +599,6 @@ class DeterministicFastPrimitivesCoder(FastCoder): def value_coder(self): return self - def to_type_hint(self): - return typehints.Any - class FastPrimitivesCoder(FastCoder): """Encodes simple primitives (e.g. str, int) efficiently. @@ -648,9 +621,6 @@ class FastPrimitivesCoder(FastCoder): else: return DeterministicFastPrimitivesCoder(self, step_label) - def to_type_hint(self): - return typehints.Any - def as_cloud_object(self, coders_context=None, is_pair_like=True): value = super(FastCoder, self).as_cloud_object(coders_context) # We currently use this coder in places where we cannot infer the coder to @@ -776,9 +746,6 @@ class TupleCoder(FastCoder): return TupleCoder([c.as_deterministic_coder(step_label, error_message) for c in self._coders]) - def to_type_hint(self): - return typehints.Tuple[tuple(c.to_type_hint() for c in self._coders)] - @staticmethod def from_type_hint(typehint, registry): return TupleCoder([registry.get_coder(t) for t in typehint.tuple_types]) @@ -911,9 +878,6 @@ class IterableCoder(FastCoder): def value_coder(self): return self._elem_coder - def to_type_hint(self): - return typehints.Iterable[self._elem_coder.to_type_hint()] - @staticmethod def from_type_hint(typehint, registry): return IterableCoder(registry.get_coder(typehint.inner_type)) diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 8f0fa10..20ac5f0 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -72,7 +72,6 @@ from apache_beam.pvalue import PDone from apache_beam.runners import PipelineRunner from apache_beam.runners import create_runner from apache_beam.transforms import ptransform -#from apache_beam.transforms import external from apache_beam.typehints import TypeCheckError from apache_beam.typehints import typehints from apache_beam.utils.annotations import deprecated @@ -791,23 +790,23 @@ class AppliedPTransform(object): for pval in self.inputs: if pval not in visited and not isinstance(pval, pvalue.PBegin): - if pval.producer is not None: - pval.producer.visit(visitor, pipeline, visited) - # The value should be visited now since we visit outputs too. - assert pval in visited, pval + assert pval.producer is not None + pval.producer.visit(visitor, pipeline, visited) + # The value should be visited now since we visit outputs too. + assert pval in visited, pval # Visit side inputs. for pval in self.side_inputs: if isinstance(pval, pvalue.AsSideInput) and pval.pvalue not in visited: pval = pval.pvalue # Unpack marker-object-wrapped pvalue. - if pval.producer is not None: - pval.producer.visit(visitor, pipeline, visited) - # The value should be visited now since we visit outputs too. - assert pval in visited - # TODO(silviuc): Is there a way to signal that we are visiting a side - # value? The issue is that the same PValue can be reachable through - # multiple paths and therefore it is not guaranteed that the value - # will be visited as a side value. + assert pval.producer is not None + pval.producer.visit(visitor, pipeline, visited) + # The value should be visited now since we visit outputs too. + assert pval in visited + # TODO(silviuc): Is there a way to signal that we are visiting a side + # value? The issue is that the same PValue can be reachable through + # multiple paths and therefore it is not guaranteed that the value + # will be visited as a side value. # Visit a composite or primitive transform. if self.is_composite(): @@ -848,11 +847,6 @@ class AppliedPTransform(object): if isinstance(output, pvalue.PCollection)} def to_runner_api(self, context): - # External tranforms require more splicing than just setting the spec. - from apache_beam.transforms import external - if isinstance(self.transform, external.ExternalTransform): - return self.transform.to_runner_api_transform(context, self.full_label) - from apache_beam.portability.api import beam_runner_api_pb2 def transform_to_runner_api(transform, context): diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py index 34cafd0..6a5e42a 100644 --- a/sdks/python/apache_beam/pvalue.py +++ b/sdks/python/apache_beam/pvalue.py @@ -145,26 +145,20 @@ class PCollection(PValue, typing.Generic[typing.TypeVar('T')]): def to_runner_api(self, context): return beam_runner_api_pb2.PCollection( - unique_name=self._unique_name(), + unique_name='%d%s.%s' % ( + len(self.producer.full_label), self.producer.full_label, self.tag), coder_id=context.coder_id_from_element_type(self.element_type), is_bounded=beam_runner_api_pb2.IsBounded.BOUNDED, windowing_strategy_id=context.windowing_strategies.get_id( self.windowing)) - def _unique_name(self): - if self.producer: - return '%d%s.%s' % ( - len(self.producer.full_label), self.producer.full_label, self.tag) - else: - return 'PCollection%s' % id(self) - @staticmethod def from_runner_api(proto, context): # Producer and tag will be filled in later, the key point is that the # same object is returned for the same pcollection id. return PCollection( None, - element_type=context.element_type_from_coder_id(proto.coder_id), + element_type=pickler.loads(proto.coder_id), windowing=context.windowing_strategies.get_by_id( proto.windowing_strategy_id)) diff --git a/sdks/python/apache_beam/runners/pipeline_context.py b/sdks/python/apache_beam/runners/pipeline_context.py index e6685d9..74156a1 100644 --- a/sdks/python/apache_beam/runners/pipeline_context.py +++ b/sdks/python/apache_beam/runners/pipeline_context.py @@ -55,10 +55,9 @@ class _PipelineContextMap(object): Under the hood it encodes and decodes these objects into runner API representations. """ - def __init__(self, context, obj_type, namespace, proto_map=None): + def __init__(self, context, obj_type, proto_map=None): self._pipeline_context = context self._obj_type = obj_type - self._namespace = namespace self._obj_to_id = {} self._id_to_obj = {} self._id_to_proto = dict(proto_map) if proto_map else {} @@ -66,11 +65,8 @@ class _PipelineContextMap(object): def _unique_ref(self, obj=None, label=None): self._counter += 1 - return "%s_%s_%s_%d" % ( - self._namespace, - self._obj_type.__name__, - label or type(obj).__name__, - self._counter) + return "ref_%s_%s_%s" % ( + self._obj_type.__name__, label or type(obj).__name__, self._counter) def populate_map(self, proto_map): for id, proto in self._id_to_proto.items(): @@ -93,19 +89,6 @@ class _PipelineContextMap(object): self._id_to_proto[id], self._pipeline_context) return self._id_to_obj[id] - def get_by_proto(self, maybe_new_proto, label=None, deduplicate=False): - if deduplicate: - for id, proto in self._id_to_proto.items(): - if proto == maybe_new_proto: - return id - return self.put_proto(self._unique_ref(label), maybe_new_proto) - - def put_proto(self, id, proto): - if id in self._id_to_proto: - raise ValueError("Id '%s' is already taken." % id) - self._id_to_proto[id] = proto - return id - def __getitem__(self, id): return self.get_by_id(id) @@ -129,8 +112,7 @@ class PipelineContext(object): def __init__( self, proto=None, default_environment=None, use_fake_coders=False, - iterable_state_read=None, iterable_state_write=None, - namespace='ref'): + iterable_state_read=None, iterable_state_write=None): if isinstance(proto, beam_fn_api_pb2.ProcessBundleDescriptor): proto = beam_runner_api_pb2.Components( coders=dict(proto.coders.items()), @@ -139,7 +121,7 @@ class PipelineContext(object): for name, cls in self._COMPONENT_TYPES.items(): setattr( self, name, _PipelineContextMap( - self, cls, namespace, getattr(proto, name, None))) + self, cls, getattr(proto, name, None))) if default_environment: self._default_environment_id = self.environments.get_id( Environment(default_environment), label='default_environment') @@ -159,12 +141,6 @@ class PipelineContext(object): else: return self.coders.get_id(coders.registry.get_coder(element_type)) - def element_type_from_coder_id(self, coder_id): - if self.use_fake_coders or coder_id not in self.coders: - return pickler.loads(coder_id) - else: - return self.coders[coder_id].to_type_hint() - @staticmethod def from_runner_api(proto): return PipelineContext(proto) diff --git a/sdks/python/apache_beam/runners/pipeline_context_test.py b/sdks/python/apache_beam/runners/pipeline_context_test.py index 6f1ec74..1e9456a 100644 --- a/sdks/python/apache_beam/runners/pipeline_context_test.py +++ b/sdks/python/apache_beam/runners/pipeline_context_test.py @@ -33,14 +33,6 @@ class PipelineContextTest(unittest.TestCase): bytes_coder_ref2 = context.coders.get_id(coders.BytesCoder()) self.assertEqual(bytes_coder_ref, bytes_coder_ref2) - def test_deduplication_by_proto(self): - context = pipeline_context.PipelineContext() - bytes_coder_proto = coders.BytesCoder().to_runner_api(None) - bytes_coder_ref = context.coders.get_by_proto(bytes_coder_proto) - bytes_coder_ref2 = context.coders.get_by_proto( - bytes_coder_proto, deduplicate=True) - self.assertEqual(bytes_coder_ref, bytes_coder_ref2) - def test_serialization(self): context = pipeline_context.PipelineContext() float_coder_ref = context.coders.get_id(coders.FloatCoder()) diff --git a/sdks/python/apache_beam/runners/portability/expansion_service.py b/sdks/python/apache_beam/runners/portability/expansion_service.py deleted file mode 100644 index e407892..0000000 --- a/sdks/python/apache_beam/runners/portability/expansion_service.py +++ /dev/null @@ -1,118 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""A PipelineExpansion service. -""" -from __future__ import absolute_import -from __future__ import print_function - -import argparse -import logging -import sys -import time -import traceback - -from apache_beam import pipeline as beam_pipeline -from apache_beam.portability import python_urns -from apache_beam.portability.api import beam_expansion_api_pb2 -from apache_beam.portability.api import beam_expansion_api_pb2_grpc -from apache_beam.runners import pipeline_context -from apache_beam.runners.portability import portable_runner -from apache_beam.transforms import external -from apache_beam.transforms import ptransform - - -class ExpansionServiceServicer( - beam_expansion_api_pb2_grpc.ExpansionServiceServicer): - - def __init__(self, options=None): - self._options = options or beam_pipeline.PipelineOptions( - environment_type=python_urns.EMBEDDED_PYTHON) - - def Expand(self, request): - try: - pipeline = beam_pipeline.Pipeline(options=self._options) - - def with_pipeline(component, pcoll_id=None): - component.pipeline = pipeline - if pcoll_id: - component.producer, component.tag = producers[pcoll_id] - # We need the lookup to resolve back to this id. - context.pcollections._obj_to_id[component] = pcoll_id - return component - - context = pipeline_context.PipelineContext( - request.components, - default_environment= - portable_runner.PortableRunner._create_environment( - self._options), - namespace=request.namespace) - producers = { - pcoll_id: (context.transforms.get_by_id(t_id), pcoll_tag) - for t_id, t_proto in request.components.transforms.items() - for pcoll_tag, pcoll_id in t_proto.outputs.items() - } - transform = with_pipeline( - ptransform.PTransform.from_runner_api( - request.transform.spec, context)) - inputs = transform._pvaluish_from_dict({ - tag: with_pipeline(context.pcollections.get_by_id(pcoll_id), pcoll_id) - for tag, pcoll_id in request.transform.inputs.items() - }) - if not inputs: - inputs = pipeline - with external.ExternalTransform.outer_namespace(request.namespace): - result = pipeline.apply( - transform, inputs, request.transform.unique_name) - expanded_transform = pipeline._root_transform().parts[-1] - # TODO(BEAM-1833): Use named outputs internally. - if isinstance(result, dict): - expanded_transform.outputs = result - pipeline_proto = pipeline.to_runner_api(context=context) - # TODO(BEAM-1833): Use named inputs internally. - expanded_transform_id = context.transforms.get_id(expanded_transform) - expanded_transform_proto = pipeline_proto.components.transforms.pop( - expanded_transform_id) - expanded_transform_proto.inputs.clear() - expanded_transform_proto.inputs.update(request.transform.inputs) - for transform_id in pipeline_proto.root_transform_ids: - del pipeline_proto.components.transforms[transform_id] - return beam_expansion_api_pb2.ExpansionResponse( - components=pipeline_proto.components, - transform=expanded_transform_proto) - - except Exception: # pylint: disable=broad-except - return beam_expansion_api_pb2.ExpansionResponse( - error=traceback.format_exc()) - - -def main(unused_argv): - parser = argparse.ArgumentParser() - parser.add_argument('-p', '--port', - type=int, - help='port on which to serve the job api') - options = parser.parse_args() - expansion_servicer = ExpansionServiceServicer() - port = expansion_servicer.start_grpc_server(options.port) - while True: - logging.info('Listening for expansion requests at %d', port) - time.sleep(300) - - -if __name__ == '__main__': - logging.getLogger().setLevel(logging.INFO) - main(sys.argv) diff --git a/sdks/python/apache_beam/transforms/__init__.py b/sdks/python/apache_beam/transforms/__init__.py index 41cfcf6..a207009 100644 --- a/sdks/python/apache_beam/transforms/__init__.py +++ b/sdks/python/apache_beam/transforms/__init__.py @@ -22,7 +22,6 @@ from __future__ import absolute_import from apache_beam.transforms import combiners from apache_beam.transforms.core import * -from apache_beam.transforms.external import * from apache_beam.transforms.ptransform import * from apache_beam.transforms.timeutil import TimeDomain from apache_beam.transforms.util import * diff --git a/sdks/python/apache_beam/transforms/external.py b/sdks/python/apache_beam/transforms/external.py deleted file mode 100644 index c9f8ba4..0000000 --- a/sdks/python/apache_beam/transforms/external.py +++ /dev/null @@ -1,208 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Defines Transform whose expansion is implemented elsewhere. -""" -from __future__ import absolute_import -from __future__ import print_function - -import contextlib -import copy -import threading - -import grpc - -from apache_beam import pvalue -from apache_beam.portability import common_urns -from apache_beam.portability.api import beam_expansion_api_pb2 -from apache_beam.portability.api import beam_expansion_api_pb2_grpc -from apache_beam.portability.api import beam_runner_api_pb2 -from apache_beam.runners import pipeline_context -from apache_beam.transforms import ptransform - - -class ExternalTransform(ptransform.PTransform): - - _namespace_counter = 0 - _namespace = threading.local() - _namespace.value = 'external' - - _EXPANDED_TRANSFORM_UNIQUE_NAME = 'root' - _IMPULSE_PREFIX = 'impulse' - - def __init__(self, urn, payload, endpoint): - # TODO: Start an endpoint given an environment? - self._urn = urn - self._payload = payload - self._endpoint = endpoint - self._namespace = self._fresh_namespace() - - def default_label(self): - return '%s(%s)' % (self.__class__.__name__, self._urn) - - @classmethod - @contextlib.contextmanager - def outer_namespace(cls, namespace): - prev = cls._namespace.value - cls._namespace.value = namespace - yield - cls._namespace.value = prev - - @classmethod - def _fresh_namespace(cls): - ExternalTransform._namespace_counter += 1 - return '%s_%d' % (cls._namespace.value, cls._namespace_counter) - - def expand(self, pvalueish): - if isinstance(pvalueish, pvalue.PBegin): - self._inputs = {} - elif isinstance(pvalueish, (list, tuple)): - self._inputs = {str(ix): pvalue for ix, pvalue in enumerate(pvalueish)} - elif isinstance(pvalueish, dict): - self._inputs = pvalueish - else: - self._inputs = {'input': pvalueish} - pipeline = ( - next(iter(self._inputs.values())).pipeline - if self._inputs - else pvalueish.pipeline) - context = pipeline_context.PipelineContext() - transform_proto = beam_runner_api_pb2.PTransform( - unique_name=self._EXPANDED_TRANSFORM_UNIQUE_NAME, - spec=beam_runner_api_pb2.FunctionSpec( - urn=self._urn, payload=self._payload)) - for tag, pcoll in self._inputs.items(): - transform_proto.inputs[tag] = context.pcollections.get_id(pcoll) - # Conversion to/from proto assumes producers. - # TODO: Possibly loosen this. - context.transforms.put_proto( - '%s_%s' % (self._IMPULSE_PREFIX, tag), - beam_runner_api_pb2.PTransform( - unique_name='%s_%s' % (self._IMPULSE_PREFIX, tag), - spec=beam_runner_api_pb2.FunctionSpec( - urn=common_urns.primitives.IMPULSE.urn), - outputs={'out': transform_proto.inputs[tag]})) - components = context.to_runner_api() - request = beam_expansion_api_pb2.ExpansionRequest( - components=components, - namespace=self._namespace, - transform=transform_proto) - - if isinstance(self._endpoint, str): - with grpc.insecure_channel(self._endpoint) as channel: - response = beam_expansion_api_pb2_grpc.ExpansionServiceStub( - channel).Expand(request) - else: - response = self._endpoint.Expand(request) - - if response.error: - raise RuntimeError(response.error) - self._expanded_components = response.components - self._expanded_transform = response.transform - result_context = pipeline_context.PipelineContext(response.components) - - def fix_output(pcoll, tag): - pcoll.pipeline = pipeline - pcoll.tag = tag - return pcoll - self._outputs = { - tag: fix_output(result_context.pcollections.get_by_id(pcoll_id), tag) - for tag, pcoll_id in self._expanded_transform.outputs.items() - } - - return self._output_to_pvalueish(self._outputs) - - def _output_to_pvalueish(self, output_dict): - if len(output_dict) == 1: - return next(iter(output_dict.values())) - else: - return output_dict - - def to_runner_api_transform(self, context, full_label): - pcoll_renames = {} - renamed_tag_seen = False - for tag, pcoll in self._inputs.items(): - if tag not in self._expanded_transform.inputs: - if renamed_tag_seen: - raise RuntimeError( - 'Ambiguity due to non-preserved tags: %s vs %s' % ( - sorted(self._expanded_transform.inputs.keys()), - sorted(self._inputs.keys()))) - else: - renamed_tag_seen = True - tag, = self._expanded_transform.inputs.keys() - pcoll_renames[self._expanded_transform.inputs[tag]] = ( - context.pcollections.get_id(pcoll)) - for tag, pcoll in self._outputs.items(): - pcoll_renames[self._expanded_transform.outputs[tag]] = ( - context.pcollections.get_id(pcoll)) - - def _equivalent(coder1, coder2): - return coder1 == coder2 or _normalize(coder1) == _normalize(coder2) - - def _normalize(coder_proto): - normalized = copy.copy(coder_proto) - normalized.spec.environment_id = '' - # TODO(robertwb): Normalize components as well. - return normalized - - for id, proto in self._expanded_components.coders.items(): - if id.startswith(self._namespace): - context.coders.put_proto(id, proto) - elif id in context.coders: - if not _equivalent(context.coders._id_to_proto[id], proto): - raise RuntimeError('Re-used coder id: %s\n%s\n%s' % ( - id, context.coders._id_to_proto[id], proto)) - else: - context.coders.put_proto(id, proto) - for id, proto in self._expanded_components.windowing_strategies.items(): - if id.startswith(self._namespace): - context.windowing_strategies.put_proto(id, proto) - for id, proto in self._expanded_components.environments.items(): - if id.startswith(self._namespace): - context.environments.put_proto(id, proto) - for id, proto in self._expanded_components.pcollections.items(): - if id not in pcoll_renames: - context.pcollections.put_proto(id, proto) - - for id, proto in self._expanded_components.transforms.items(): - if id.startswith(self._IMPULSE_PREFIX): - # Our fake inputs. - continue - assert id.startswith(self._namespace), (id, self._namespace) - new_proto = beam_runner_api_pb2.PTransform( - unique_name=full_label + proto.unique_name[ - len(self._EXPANDED_TRANSFORM_UNIQUE_NAME):], - spec=proto.spec, - subtransforms=proto.subtransforms, - inputs={tag: pcoll_renames.get(pcoll, pcoll) - for tag, pcoll in proto.inputs.items()}, - outputs={tag: pcoll_renames.get(pcoll, pcoll) - for tag, pcoll in proto.outputs.items()}) - context.transforms.put_proto(id, new_proto) - - return self._expanded_transform - - -def memoize(func): - cache = {} - - def wrapper(*args): - if args not in cache: - cache[args] = func(*args) - return cache[args] - return wrapper diff --git a/sdks/python/apache_beam/transforms/external_test.py b/sdks/python/apache_beam/transforms/external_test.py deleted file mode 100644 index e66cf21..0000000 --- a/sdks/python/apache_beam/transforms/external_test.py +++ /dev/null @@ -1,205 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Unit tests for the transform.util classes.""" - -from __future__ import absolute_import - -import argparse -import subprocess -import sys -import unittest - -import grpc -from past.builtins import unicode - -import apache_beam as beam -from apache_beam.portability import python_urns -from apache_beam.runners.portability import expansion_service -from apache_beam.testing.util import assert_that -from apache_beam.testing.util import equal_to -from apache_beam.transforms import ptransform - - -class ExternalTransformTest(unittest.TestCase): - - # This will be overwritten if set via a flag. - expansion_service_jar = None - - def test_simple(self): - - @ptransform.PTransform.register_urn('simple', None) - class SimpleTransform(ptransform.PTransform): - def expand(self, pcoll): - return pcoll | beam.Map(lambda x: 'Simple(%s)' % x) - - def to_runner_api_parameter(self, unused_context): - return 'simple', None - - @staticmethod - def from_runner_api_parameter(unused_parameter, unused_context): - return SimpleTransform() - - with beam.Pipeline() as p: - res = ( - p - | beam.Create(['a', 'b']) - | beam.ExternalTransform( - 'simple', - None, - expansion_service.ExpansionServiceServicer())) - assert_that(res, equal_to(['Simple(a)', 'Simple(b)'])) - - def test_multi(self): - - @ptransform.PTransform.register_urn('multi', None) - class MutltiTransform(ptransform.PTransform): - def expand(self, pcolls): - return { - 'main': - (pcolls['main1'], pcolls['main2']) - | beam.Flatten() - | beam.Map(lambda x, s: x + s, - beam.pvalue.AsSingleton(pcolls['side'])), - 'side': pcolls['side'] | beam.Map(lambda x: x + x), - } - - def to_runner_api_parameter(self, unused_context): - return 'multi', None - - @staticmethod - def from_runner_api_parameter(unused_parameter, unused_context): - return MutltiTransform() - - with beam.Pipeline() as p: - main1 = p | 'Main1' >> beam.Create(['a', 'bb'], reshuffle=False) - main2 = p | 'Main2' >> beam.Create(['x', 'yy', 'zzz'], reshuffle=False) - side = p | 'Side' >> beam.Create(['s']) - res = dict(main1=main1, main2=main2, side=side) | beam.ExternalTransform( - 'multi', None, expansion_service.ExpansionServiceServicer()) - assert_that(res['main'], equal_to(['as', 'bbs', 'xs', 'yys', 'zzzs'])) - assert_that(res['side'], equal_to(['ss']), label='CheckSide') - - def test_payload(self): - - @ptransform.PTransform.register_urn('payload', bytes) - class PayloadTransform(ptransform.PTransform): - def __init__(self, payload): - self._payload = payload - - def expand(self, pcoll): - return pcoll | beam.Map(lambda x, s: x + s, self._payload) - - def to_runner_api_parameter(self, unused_context): - return b'payload', self._payload.encode('ascii') - - @staticmethod - def from_runner_api_parameter(payload, unused_context): - return PayloadTransform(payload.decode('ascii')) - - with beam.Pipeline() as p: - res = ( - p - | beam.Create(['a', 'bb'], reshuffle=False) - | beam.ExternalTransform( - 'payload', b's', - expansion_service.ExpansionServiceServicer())) - assert_that(res, equal_to(['as', 'bbs'])) - - def test_nested(self): - @ptransform.PTransform.register_urn('fib', bytes) - class FibTransform(ptransform.PTransform): - def __init__(self, level): - self._level = level - - def expand(self, p): - if self._level <= 2: - return p | beam.Create([1]) - else: - a = p | 'A' >> beam.ExternalTransform( - 'fib', str(self._level - 1).encode('ascii'), - expansion_service.ExpansionServiceServicer()) - b = p | 'B' >> beam.ExternalTransform( - 'fib', str(self._level - 2).encode('ascii'), - expansion_service.ExpansionServiceServicer()) - return ( - (a, b) - | beam.Flatten() - | beam.CombineGlobally(sum).without_defaults()) - - def to_runner_api_parameter(self, unused_context): - return 'fib', str(self._level).encode('ascii') - - @staticmethod - def from_runner_api_parameter(level, unused_context): - return FibTransform(int(level.decode('ascii'))) - - with beam.Pipeline() as p: - assert_that(p | FibTransform(6), equal_to([8])) - - def test_java_expansion(self): - if not self.expansion_service_jar: - raise unittest.SkipTest('No expansion service jar provided.') - - # The actual definitions of these transforms is in - # org.apache.beam.runners.core.construction.TestExpansionService. - TEST_COUNT_URN = "pytest:beam:transforms:count" - TEST_FILTER_URN = "pytest:beam:transforms:filter_less_than" - - # Run as cheaply as possible on the portable runner. - # TODO(robertwb): Support this directly in the direct runner. - options = beam.options.pipeline_options.PipelineOptions( - runner='PortableRunner', - experiments=['beam_fn_api'], - environment_type=python_urns.EMBEDDED_PYTHON, - job_endpoint='embed') - - try: - # Start the java server and wait for it to be ready. - port = '8091' - address = 'localhost:%s' % port - server = subprocess.Popen( - ['java', '-jar', self.expansion_service_jar, port]) - with grpc.insecure_channel(address) as channel: - grpc.channel_ready_future(channel).result() - - # Run a simple count-filtered-letters pipeline. - with beam.Pipeline(options=options) as p: - res = ( - p - | beam.Create(list('aaabccxyyzzz')) - | beam.Map(unicode) - | beam.ExternalTransform(TEST_FILTER_URN, 'middle', address) - | beam.ExternalTransform(TEST_COUNT_URN, None, address) - | beam.Map(lambda kv: '%s: %s' % kv)) - - assert_that(res, equal_to(['a: 3', 'b: 1', 'c: 2'])) - - finally: - server.kill() - - -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument('--expansion_service_jar') - known_args, sys.argv = parser.parse_known_args(sys.argv) - - if known_args.expansion_service_jar: - ExternalTransformTest.expansion_service_jar = ( - known_args.expansion_service_jar) - - unittest.main() diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index 2ebc93b..c512d9f 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -36,7 +36,6 @@ FlatMap processing functions. from __future__ import absolute_import -import contextlib import copy import itertools import operator @@ -231,23 +230,6 @@ def get_nested_pvalues(pvalueish): return pvalues -def get_nested_pvalues0(pvalueish): - if isinstance(pvalueish, (tuple, list)): - tagged_values = enumerate(pvalueish) - if isinstance(pvalueish, dict): - tagged_values = pvalueish.items() - else: - yield None, pvalueish - return - - for tag, subvalue in tagged_values: - for subtag, subsubvalue in get_nested_pvalues(subvalue): - if subtag is None: - yield tag, subsubvalue - else: - yield '%s.%s' % (tag, subsubvalue), subsubvalue - - class _ZipPValues(object): """Pairs each PValue in a pvalueish with a value in a parallel out sibling. @@ -544,37 +526,13 @@ class PTransform(WithTypeHints, HasDisplayData): yield pvalueish return pvalueish, tuple(_dict_tuple_leaves(pvalueish)) - def _pvaluish_from_dict(self, input_dict): - if len(input_dict) == 1: - return next(iter(input_dict.values())) - else: - return input_dict - _known_urns = {} @classmethod def register_urn(cls, urn, parameter_type, constructor=None): def register(constructor): - if isinstance(constructor, type): - constructor.from_runner_api_parameter = register( - constructor.from_runner_api_parameter) - # pylint isn't smart enough to recognize when this is used - # on a class or a method, and will emit a no-self-warning - # in the latter case. Rather than suppressing this at each - # use, we fool it here through some dynamic patching that - # pylint will also not understand. - - @contextlib.contextmanager - def fake_static_method(): - actual_static_method = staticmethod - globals()['staticmethod'] = lambda x: x - yield - globals()['staticmethod'] = actual_static_method - with fake_static_method(): - return staticmethod(constructor) - else: - cls._known_urns[urn] = parameter_type, constructor - return staticmethod(constructor) + cls._known_urns[urn] = parameter_type, constructor + return staticmethod(constructor) if constructor: # Used as a statement. register(constructor) diff --git a/sdks/python/build.gradle b/sdks/python/build.gradle index 18bd161..fc2372e 100644 --- a/sdks/python/build.gradle +++ b/sdks/python/build.gradle @@ -430,15 +430,3 @@ project.task('createProcessWorker') { } } } - -project.task('crossLanguagePythonJava') { - dependsOn 'setupVirtualenv' - dependsOn ':beam-sdks-java-container:docker' - dependsOn ':beam-runners-core-construction-java:testExpansionService' - doLast { - exec { - executable 'sh' - args '-c', ". ${project.ext.envdir}/bin/activate && pip install -e .[test] && python -m apache_beam.transforms.external_test --expansion_service_jar=${project(":beam-runners-core-construction-java:").testExpansionService.archivePath}" - } - } -}