chamikaramj commented on code in PR #28210: URL: https://github.com/apache/beam/pull/28210#discussion_r1332251109
########## runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreateTranslation.java: ########## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction; + +import com.google.auto.service.AutoService; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; +import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Create.Values; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.Row; +import org.checkerframework.checker.nullness.qual.Nullable; + +@SuppressWarnings({ + "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) + "nullness" +}) +public class CreateTranslation implements TransformPayloadTranslator<Create.Values<?>> { + + Schema createConfigSchema = + Schema.builder() + .addArrayField("values", FieldType.BYTES) + .addByteArrayField("serialized_coder") + .build(); + + @Override + public String getUrn() { + return PTransformTranslation.CREATE_TRANSFORM_URN; + } + + @Override + public @Nullable FunctionSpec translate( + AppliedPTransform<?, ?, Values<?>> application, SdkComponents components) throws IOException { + // Currently just returns an empty payload. + // We can implement an actual payload of runners start using this transform. + return FunctionSpec.newBuilder().setUrn(getUrn(application.getTransform())).build(); + } + + private byte[] toByteArray(Object object) { + try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream out = new ObjectOutputStream(bos)) { + out.writeObject(object); + return bos.toByteArray(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private Object fromByteArray(byte[] bytes) { + try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes); + ObjectInputStream in = new ObjectInputStream(bis)) { + return in.readObject(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public @Nullable Row toConfigRow(Values<?> transform) { + List<byte[]> encodedElements = new ArrayList<>(); + transform + .getElements() + .forEach( + object -> { + encodedElements.add(toByteArray(object)); + }); + + byte[] serializedCoder = + transform.getCoder() != null ? toByteArray(transform.getCoder()) : new byte[] {}; + return Row.withSchema(createConfigSchema) + .withFieldValue("values", encodedElements) + .withFieldValue("serialized_coder", serializedCoder) + .build(); + } + + @Override + public Create.@Nullable Values<?> fromConfigRow(Row configRow) { Review Comment: This class was reverted. ########## runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreateTranslation.java: ########## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction; + +import com.google.auto.service.AutoService; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; +import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Create.Values; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.Row; +import org.checkerframework.checker.nullness.qual.Nullable; + +@SuppressWarnings({ + "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) Review Comment: Removed the suppression. ########## runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreateTranslation.java: ########## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction; + +import com.google.auto.service.AutoService; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; +import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Create.Values; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.Row; +import org.checkerframework.checker.nullness.qual.Nullable; + +@SuppressWarnings({ + "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) + "nullness" +}) +public class CreateTranslation implements TransformPayloadTranslator<Create.Values<?>> { + + Schema createConfigSchema = + Schema.builder() + .addArrayField("values", FieldType.BYTES) + .addByteArrayField("serialized_coder") + .build(); + + @Override + public String getUrn() { + return PTransformTranslation.CREATE_TRANSFORM_URN; + } + + @Override + public @Nullable FunctionSpec translate( + AppliedPTransform<?, ?, Values<?>> application, SdkComponents components) throws IOException { + // Currently just returns an empty payload. + // We can implement an actual payload of runners start using this transform. + return FunctionSpec.newBuilder().setUrn(getUrn(application.getTransform())).build(); + } + + private byte[] toByteArray(Object object) { + try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream out = new ObjectOutputStream(bos)) { + out.writeObject(object); + return bos.toByteArray(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private Object fromByteArray(byte[] bytes) { + try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes); + ObjectInputStream in = new ObjectInputStream(bis)) { + return in.readObject(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public @Nullable Row toConfigRow(Values<?> transform) { + List<byte[]> encodedElements = new ArrayList<>(); + transform + .getElements() + .forEach( + object -> { + encodedElements.add(toByteArray(object)); + }); + + byte[] serializedCoder = + transform.getCoder() != null ? toByteArray(transform.getCoder()) : new byte[] {}; + return Row.withSchema(createConfigSchema) + .withFieldValue("values", encodedElements) + .withFieldValue("serialized_coder", serializedCoder) + .build(); + } + + @Override + public Create.@Nullable Values<?> fromConfigRow(Row configRow) { + Values transform = + Create.of( + configRow.getArray("values").stream() + .map(bytesValue -> fromByteArray((byte[]) bytesValue)) + .collect(Collectors.toList())); + byte[] serializedCoder = configRow.getBytes("serialized_coder"); Review Comment: This class was reverted. ########## runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java: ########## @@ -386,9 +388,9 @@ public RunnerApi.PTransform translate( * Translates a set of registered transforms whose content only differs based by differences in * their {@link FunctionSpec}s and URNs. */ - private static class KnownTransformPayloadTranslator<T extends PTransform<?, ?>> + public static class KnownTransformPayloadTranslator<T extends PTransform<?, ?>> Review Comment: Reverted. ########## runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreateTranslation.java: ########## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction; + +import com.google.auto.service.AutoService; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; +import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Create.Values; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.Row; +import org.checkerframework.checker.nullness.qual.Nullable; + +@SuppressWarnings({ + "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) + "nullness" +}) +public class CreateTranslation implements TransformPayloadTranslator<Create.Values<?>> { + + Schema createConfigSchema = + Schema.builder() + .addArrayField("values", FieldType.BYTES) + .addByteArrayField("serialized_coder") + .build(); + + @Override + public String getUrn() { + return PTransformTranslation.CREATE_TRANSFORM_URN; + } + + @Override + public @Nullable FunctionSpec translate( + AppliedPTransform<?, ?, Values<?>> application, SdkComponents components) throws IOException { + // Currently just returns an empty payload. + // We can implement an actual payload of runners start using this transform. + return FunctionSpec.newBuilder().setUrn(getUrn(application.getTransform())).build(); + } + + private byte[] toByteArray(Object object) { + try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream out = new ObjectOutputStream(bos)) { + out.writeObject(object); + return bos.toByteArray(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private Object fromByteArray(byte[] bytes) { + try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes); + ObjectInputStream in = new ObjectInputStream(bis)) { + return in.readObject(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public @Nullable Row toConfigRow(Values<?> transform) { + List<byte[]> encodedElements = new ArrayList<>(); + transform + .getElements() + .forEach( + object -> { + encodedElements.add(toByteArray(object)); + }); + + byte[] serializedCoder = Review Comment: This class was reverted. ########## runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java: ########## @@ -102,11 +121,370 @@ public void visitPrimitiveTransform(Node node) { // TODO(JIRA-5649): Don't even emit these transforms in the generated protos. res = elideDeprecatedViews(res); } + + ExternalTranslationOptions externalTranslationOptions = + pipeline.getOptions().as(ExternalTranslationOptions.class); + List<String> urnsToOverride = externalTranslationOptions.getTransformsToOverride(); + if (urnsToOverride.size() > 0) { + // We use PTransformPayloadTranslators and the Transform Service to re-generate the pipeline + // proto components for the updated transform and update the pipeline proto. + Map<String, PTransform> transforms = res.getComponents().getTransformsMap(); + List<String> alreadyCheckedURns = new ArrayList<>(); + for (Entry<String, PTransform> entry : transforms.entrySet()) { + String urn = entry.getValue().getSpec().getUrn(); + if (!alreadyCheckedURns.contains(urn) && urnsToOverride.contains(urn)) { + alreadyCheckedURns.add(urn); Review Comment: Done. ########## runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java: ########## @@ -102,11 +121,370 @@ public void visitPrimitiveTransform(Node node) { // TODO(JIRA-5649): Don't even emit these transforms in the generated protos. res = elideDeprecatedViews(res); } + + ExternalTranslationOptions externalTranslationOptions = + pipeline.getOptions().as(ExternalTranslationOptions.class); + List<String> urnsToOverride = externalTranslationOptions.getTransformsToOverride(); + if (urnsToOverride.size() > 0) { + // We use PTransformPayloadTranslators and the Transform Service to re-generate the pipeline + // proto components for the updated transform and update the pipeline proto. + Map<String, PTransform> transforms = res.getComponents().getTransformsMap(); + List<String> alreadyCheckedURns = new ArrayList<>(); + for (Entry<String, PTransform> entry : transforms.entrySet()) { + String urn = entry.getValue().getSpec().getUrn(); + if (!alreadyCheckedURns.contains(urn) && urnsToOverride.contains(urn)) { + alreadyCheckedURns.add(urn); + // All transforms in the pipeline with the given urns have to be overridden. + List< + AppliedPTransform< + PInput, + POutput, + org.apache.beam.sdk.transforms.PTransform<? super PInput, POutput>>> + appliedPTransforms = + findAppliedPTransforms( + urn, pipeline, KnownTransformPayloadTranslator.KNOWN_PAYLOAD_TRANSLATORS); + for (AppliedPTransform< + PInput, + POutput, + org.apache.beam.sdk.transforms.PTransform<? super PInput, POutput>> + appliedPTransform : appliedPTransforms) { + TransformPayloadTranslator< + org.apache.beam.sdk.transforms.PTransform<? super PInput, POutput>> + payloadTranslator = + KnownTransformPayloadTranslator.KNOWN_PAYLOAD_TRANSLATORS.get( + appliedPTransform.getTransform().getClass()); + try { + // Override the transform using the transform service. + res = + updateTransformViaTransformService( + urn, appliedPTransform, payloadTranslator, pipeline, res); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + } + } + // Validate that translation didn't produce an invalid pipeline. PipelineValidator.validate(res); return res; } + private static int findAvailablePort() throws IOException { + ServerSocket s = new ServerSocket(0); + try { + return s.getLocalPort(); + } finally { + s.close(); + try { + // Some systems don't free the port for future use immediately. + Thread.sleep(100); + } catch (InterruptedException exn) { + // ignore + } + } + } + + // Override the given transform to the version available in a new transform service. + private static < + InputT extends PInput, + OutputT extends POutput, + TransformT extends org.apache.beam.sdk.transforms.PTransform<InputT, OutputT>> + RunnerApi.Pipeline updateTransformViaTransformService( + String urn, + AppliedPTransform< + PInput, + POutput, + org.apache.beam.sdk.transforms.PTransform<? super PInput, POutput>> + appliedPTransform, + TransformPayloadTranslator< + org.apache.beam.sdk.transforms.PTransform<? super PInput, POutput>> + originalPayloadTranslator, + Pipeline pipeline, + RunnerApi.Pipeline runnerAPIpipeline) + throws IOException { + ExternalTranslationOptions externalTranslationOptions = + pipeline.getOptions().as(ExternalTranslationOptions.class); + + // Config row to re-construct the transform within the transform service. + Row configRow = originalPayloadTranslator.toConfigRow(appliedPTransform.getTransform()); + ByteStringOutputStream outputStream = new ByteStringOutputStream(); + try { + RowCoder.of(configRow.getSchema()).encode(configRow, outputStream); + } catch (IOException e) { + throw new RuntimeException(e); + } + + // Java expansion serivice able to identify and expand transforms that includes the construction + // config provided here. + ExternalTransforms.ExternalConfigurationPayload payload = + ExternalTransforms.ExternalConfigurationPayload.newBuilder() + .setSchema(SchemaTranslation.schemaToProto(configRow.getSchema(), true)) + .setPayload(outputStream.toByteString()) + .build(); + + String serviceAddress = null; + TransformServiceLauncher service = null; + try { + if (externalTranslationOptions.getTransformServiceAddress() != null) { + serviceAddress = externalTranslationOptions.getTransformServiceAddress(); + } else if (externalTranslationOptions.getTransformServiceBeamVersion() != null) { + String projectName = UUID.randomUUID().toString(); + service = TransformServiceLauncher.forProject(projectName, findAvailablePort()); + service.setBeamVersion(externalTranslationOptions.getTransformServiceBeamVersion()); + + // Starting the transform service. + service.start(); + // Waiting the service to be ready. + service.waitTillUp(15000); + } else { + throw new IllegalArgumentException( + "Either option TransformServiceAddress or option TransformServiceBeamVersion should be provided to override a transform using the transform service"); + } + + if (serviceAddress == null) { + throw new IllegalArgumentException( + "Cannot override the transform " + + urn + + " since a valid transform service address could not be determined"); + } + + // Creating an ExternalTransform and expanding it using the transform service. + // Input will be the same input provided to the transform bing overridden. + ExpandableTransform<InputT, OutputT> externalTransform = + (ExpandableTransform<InputT, OutputT>) + External.of(urn, payload.toByteArray(), serviceAddress); + + PCollectionTuple input = PCollectionTuple.empty(pipeline); + for (TupleTag<?> tag : (Set<TupleTag<?>>) appliedPTransform.getInputs().keySet()) { + PCollection<?> pc = appliedPTransform.getInputs().get(tag); + if (pc == null) { + throw new IllegalArgumentException( + "Input of transform " + appliedPTransform + " with tag " + tag + " was null."); + } + input = input.and(tag, (PCollection) pc); + } + POutput output = externalTransform.expand((InputT) input); + + // Outputs of the transform being overridden. + Map<TupleTag<?>, PCollection<?>> originalOutputs = appliedPTransform.getOutputs(); + + // After expansion some transforms might still refer to the output of the already overridden + // transform as their input. + // Such inputs have to be overridden to use the output of the new upgraded transform. + Map<String, String> inputReplacements = new HashMap<>(); + + // Will contain the outputs of the upgraded transform. + Map<TupleTag<?>, PCollection<?>> newOutputs = new HashMap<>(); + + if (output instanceof PCollectionTuple) { Review Comment: Makes sense. Updated code (now in TransformUpgrader) to directly update the proto and directly invoke the TransformService using protos instead of going through External.java. ########## runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java: ########## @@ -102,11 +121,370 @@ public void visitPrimitiveTransform(Node node) { // TODO(JIRA-5649): Don't even emit these transforms in the generated protos. res = elideDeprecatedViews(res); } + + ExternalTranslationOptions externalTranslationOptions = + pipeline.getOptions().as(ExternalTranslationOptions.class); + List<String> urnsToOverride = externalTranslationOptions.getTransformsToOverride(); + if (urnsToOverride.size() > 0) { + // We use PTransformPayloadTranslators and the Transform Service to re-generate the pipeline + // proto components for the updated transform and update the pipeline proto. + Map<String, PTransform> transforms = res.getComponents().getTransformsMap(); + List<String> alreadyCheckedURns = new ArrayList<>(); + for (Entry<String, PTransform> entry : transforms.entrySet()) { + String urn = entry.getValue().getSpec().getUrn(); + if (!alreadyCheckedURns.contains(urn) && urnsToOverride.contains(urn)) { + alreadyCheckedURns.add(urn); + // All transforms in the pipeline with the given urns have to be overridden. + List< + AppliedPTransform< + PInput, + POutput, + org.apache.beam.sdk.transforms.PTransform<? super PInput, POutput>>> + appliedPTransforms = + findAppliedPTransforms( + urn, pipeline, KnownTransformPayloadTranslator.KNOWN_PAYLOAD_TRANSLATORS); + for (AppliedPTransform< + PInput, + POutput, + org.apache.beam.sdk.transforms.PTransform<? super PInput, POutput>> + appliedPTransform : appliedPTransforms) { + TransformPayloadTranslator< + org.apache.beam.sdk.transforms.PTransform<? super PInput, POutput>> + payloadTranslator = + KnownTransformPayloadTranslator.KNOWN_PAYLOAD_TRANSLATORS.get( + appliedPTransform.getTransform().getClass()); + try { + // Override the transform using the transform service. + res = + updateTransformViaTransformService( + urn, appliedPTransform, payloadTranslator, pipeline, res); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + } + } + // Validate that translation didn't produce an invalid pipeline. PipelineValidator.validate(res); return res; } + private static int findAvailablePort() throws IOException { + ServerSocket s = new ServerSocket(0); + try { + return s.getLocalPort(); + } finally { + s.close(); + try { + // Some systems don't free the port for future use immediately. + Thread.sleep(100); + } catch (InterruptedException exn) { + // ignore + } + } + } + + // Override the given transform to the version available in a new transform service. + private static < + InputT extends PInput, + OutputT extends POutput, + TransformT extends org.apache.beam.sdk.transforms.PTransform<InputT, OutputT>> + RunnerApi.Pipeline updateTransformViaTransformService( + String urn, + AppliedPTransform< + PInput, + POutput, + org.apache.beam.sdk.transforms.PTransform<? super PInput, POutput>> + appliedPTransform, + TransformPayloadTranslator< + org.apache.beam.sdk.transforms.PTransform<? super PInput, POutput>> + originalPayloadTranslator, + Pipeline pipeline, + RunnerApi.Pipeline runnerAPIpipeline) + throws IOException { + ExternalTranslationOptions externalTranslationOptions = + pipeline.getOptions().as(ExternalTranslationOptions.class); + + // Config row to re-construct the transform within the transform service. + Row configRow = originalPayloadTranslator.toConfigRow(appliedPTransform.getTransform()); + ByteStringOutputStream outputStream = new ByteStringOutputStream(); + try { + RowCoder.of(configRow.getSchema()).encode(configRow, outputStream); + } catch (IOException e) { + throw new RuntimeException(e); + } + + // Java expansion serivice able to identify and expand transforms that includes the construction + // config provided here. + ExternalTransforms.ExternalConfigurationPayload payload = + ExternalTransforms.ExternalConfigurationPayload.newBuilder() + .setSchema(SchemaTranslation.schemaToProto(configRow.getSchema(), true)) + .setPayload(outputStream.toByteString()) + .build(); + + String serviceAddress = null; + TransformServiceLauncher service = null; + try { + if (externalTranslationOptions.getTransformServiceAddress() != null) { + serviceAddress = externalTranslationOptions.getTransformServiceAddress(); + } else if (externalTranslationOptions.getTransformServiceBeamVersion() != null) { + String projectName = UUID.randomUUID().toString(); + service = TransformServiceLauncher.forProject(projectName, findAvailablePort()); + service.setBeamVersion(externalTranslationOptions.getTransformServiceBeamVersion()); + + // Starting the transform service. + service.start(); + // Waiting the service to be ready. + service.waitTillUp(15000); + } else { + throw new IllegalArgumentException( + "Either option TransformServiceAddress or option TransformServiceBeamVersion should be provided to override a transform using the transform service"); + } + + if (serviceAddress == null) { + throw new IllegalArgumentException( + "Cannot override the transform " + + urn + + " since a valid transform service address could not be determined"); + } + + // Creating an ExternalTransform and expanding it using the transform service. + // Input will be the same input provided to the transform bing overridden. + ExpandableTransform<InputT, OutputT> externalTransform = + (ExpandableTransform<InputT, OutputT>) + External.of(urn, payload.toByteArray(), serviceAddress); + + PCollectionTuple input = PCollectionTuple.empty(pipeline); + for (TupleTag<?> tag : (Set<TupleTag<?>>) appliedPTransform.getInputs().keySet()) { + PCollection<?> pc = appliedPTransform.getInputs().get(tag); + if (pc == null) { + throw new IllegalArgumentException( + "Input of transform " + appliedPTransform + " with tag " + tag + " was null."); + } + input = input.and(tag, (PCollection) pc); + } + POutput output = externalTransform.expand((InputT) input); + + // Outputs of the transform being overridden. + Map<TupleTag<?>, PCollection<?>> originalOutputs = appliedPTransform.getOutputs(); + + // After expansion some transforms might still refer to the output of the already overridden + // transform as their input. + // Such inputs have to be overridden to use the output of the new upgraded transform. + Map<String, String> inputReplacements = new HashMap<>(); + + // Will contain the outputs of the upgraded transform. + Map<TupleTag<?>, PCollection<?>> newOutputs = new HashMap<>(); + + if (output instanceof PCollectionTuple) { + newOutputs.putAll(((PCollectionTuple) output).getAll()); + for (Map.Entry<TupleTag<?>, PCollection<?>> entry : newOutputs.entrySet()) { + if (entry == null) { + throw new IllegalArgumentException( + "Found unexpected null entry when iterating the outputs of expanded " + + "ExpandableTransform " + + externalTransform); + } + if (!appliedPTransform.getOutputs().containsKey(entry.getKey())) { + throw new RuntimeException( + "Could not find the tag " + entry.getKey() + " in the original set of outputs"); + } + PCollection<?> originalOutputPc = originalOutputs.get(entry.getKey()); + if (originalOutputPc == null) { + throw new IllegalArgumentException( + "Original output of transform " + + appliedPTransform + + " with tag " + + entry.getKey() + + " was null"); + } + inputReplacements.put(originalOutputPc.getName(), entry.getValue().getName()); + } + } else if (output instanceof PCollection) { + newOutputs.put(new TupleTag<>("temp_main_tag"), (PCollection) output); + inputReplacements.put( + originalOutputs.get(originalOutputs.keySet().iterator().next()).getName(), + ((PCollection) output).getName()); + } else { + throw new RuntimeException("Unexpected output type"); + } + + // We create a new AppliedPTransform to represent the upgraded transform and register it in an + // SdkComponents object. + AppliedPTransform<?, ?, ?> updatedAppliedPTransform = + AppliedPTransform.of( + appliedPTransform.getFullName() + "_external", + appliedPTransform.getInputs(), + newOutputs, + externalTransform, + externalTransform.getResourceHints(), + appliedPTransform.getPipeline()); + SdkComponents updatedComponents = + SdkComponents.create( + runnerAPIpipeline.getComponents(), runnerAPIpipeline.getRequirementsList()); + String updatedTransformId = + updatedComponents.registerPTransform(updatedAppliedPTransform, Collections.emptyList()); + RunnerApi.Components updatedRunnerApiComponents = updatedComponents.toComponents(); + + // Recording input updates to the transforms to refer to the upgraded transform instead of the + // old one. + // Also recording the newly generated id of the old (overridden) transform in the + // updatedRunnerApiComponents. + Map<String, Map<String, String>> transformInputUpdates = new HashMap<>(); Review Comment: Reverted. ########## sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java: ########## @@ -178,6 +184,65 @@ public List<String> getDependencies( } } + List<String> deprecatedTransformURNs = ImmutableList.of(READ_TRANSFORM_URN); + for (TransformPayloadTranslatorRegistrar registrar : + ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) { + for (Map.Entry<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator> + entry : registrar.getTransformPayloadTranslators().entrySet()) { + @Initialized TransformPayloadTranslator translator = entry.getValue(); + if (translator == null) { + continue; + } + + String urn = null; Review Comment: Should be done now. ########## runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreateTranslation.java: ########## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction; + +import com.google.auto.service.AutoService; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; +import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Create.Values; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.Row; +import org.checkerframework.checker.nullness.qual.Nullable; + +@SuppressWarnings({ + "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) + "nullness" +}) +public class CreateTranslation implements TransformPayloadTranslator<Create.Values<?>> { + + Schema createConfigSchema = + Schema.builder() + .addArrayField("values", FieldType.BYTES) + .addByteArrayField("serialized_coder") + .build(); + + @Override + public String getUrn() { + return PTransformTranslation.CREATE_TRANSFORM_URN; + } + + @Override + public @Nullable FunctionSpec translate( + AppliedPTransform<?, ?, Values<?>> application, SdkComponents components) throws IOException { + // Currently just returns an empty payload. + // We can implement an actual payload of runners start using this transform. + return FunctionSpec.newBuilder().setUrn(getUrn(application.getTransform())).build(); + } + + private byte[] toByteArray(Object object) { + try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream out = new ObjectOutputStream(bos)) { + out.writeObject(object); + return bos.toByteArray(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private Object fromByteArray(byte[] bytes) { + try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes); + ObjectInputStream in = new ObjectInputStream(bis)) { + return in.readObject(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public @Nullable Row toConfigRow(Values<?> transform) { Review Comment: This class was reverted. ########## runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreateTranslation.java: ########## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction; + +import com.google.auto.service.AutoService; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; +import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Create.Values; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.Row; +import org.checkerframework.checker.nullness.qual.Nullable; + +@SuppressWarnings({ + "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) + "nullness" +}) +public class CreateTranslation implements TransformPayloadTranslator<Create.Values<?>> { + + Schema createConfigSchema = + Schema.builder() + .addArrayField("values", FieldType.BYTES) + .addByteArrayField("serialized_coder") + .build(); + + @Override + public String getUrn() { + return PTransformTranslation.CREATE_TRANSFORM_URN; + } + + @Override + public @Nullable FunctionSpec translate( + AppliedPTransform<?, ?, Values<?>> application, SdkComponents components) throws IOException { + // Currently just returns an empty payload. + // We can implement an actual payload of runners start using this transform. + return FunctionSpec.newBuilder().setUrn(getUrn(application.getTransform())).build(); + } + + private byte[] toByteArray(Object object) { + try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream out = new ObjectOutputStream(bos)) { + out.writeObject(object); + return bos.toByteArray(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private Object fromByteArray(byte[] bytes) { + try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes); + ObjectInputStream in = new ObjectInputStream(bis)) { + return in.readObject(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public @Nullable Row toConfigRow(Values<?> transform) { + List<byte[]> encodedElements = new ArrayList<>(); + transform + .getElements() + .forEach( + object -> { + encodedElements.add(toByteArray(object)); + }); Review Comment: This class was reverted. ########## runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java: ########## @@ -2572,6 +2572,11 @@ public String getUrn(PTransform transform) { return "dataflow_stub:" + transform.getClass().getName(); } + @Override + public String getUrn() { Review Comment: The access modifier of interface method is public and Java doesn't allow reducing access in sub-classes. With the updated interface, the callers should be able to directly invoke getUrn() unless the sub-class cannot determine the URN without looking at the transform (which should be rare and we should not use the same translator for multiple transform classes. ########## runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreateTranslation.java: ########## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction; + +import com.google.auto.service.AutoService; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; +import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Create.Values; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.Row; +import org.checkerframework.checker.nullness.qual.Nullable; + +@SuppressWarnings({ + "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) + "nullness" +}) +public class CreateTranslation implements TransformPayloadTranslator<Create.Values<?>> { + + Schema createConfigSchema = + Schema.builder() + .addArrayField("values", FieldType.BYTES) + .addByteArrayField("serialized_coder") + .build(); + + @Override + public String getUrn() { + return PTransformTranslation.CREATE_TRANSFORM_URN; + } + + @Override + public @Nullable FunctionSpec translate( + AppliedPTransform<?, ?, Values<?>> application, SdkComponents components) throws IOException { + // Currently just returns an empty payload. + // We can implement an actual payload of runners start using this transform. + return FunctionSpec.newBuilder().setUrn(getUrn(application.getTransform())).build(); + } + + private byte[] toByteArray(Object object) { + try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream out = new ObjectOutputStream(bos)) { + out.writeObject(object); + return bos.toByteArray(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private Object fromByteArray(byte[] bytes) { + try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes); + ObjectInputStream in = new ObjectInputStream(bis)) { + return in.readObject(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public @Nullable Row toConfigRow(Values<?> transform) { + List<byte[]> encodedElements = new ArrayList<>(); + transform + .getElements() + .forEach( + object -> { + encodedElements.add(toByteArray(object)); + }); + + byte[] serializedCoder = + transform.getCoder() != null ? toByteArray(transform.getCoder()) : new byte[] {}; + return Row.withSchema(createConfigSchema) + .withFieldValue("values", encodedElements) + .withFieldValue("serialized_coder", serializedCoder) + .build(); + } + + @Override + public Create.@Nullable Values<?> fromConfigRow(Row configRow) { + Values transform = + Create.of( Review Comment: This class was reverted. ########## runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreateTranslation.java: ########## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction; + +import com.google.auto.service.AutoService; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; +import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Create.Values; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.Row; +import org.checkerframework.checker.nullness.qual.Nullable; + +@SuppressWarnings({ + "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) + "nullness" +}) +public class CreateTranslation implements TransformPayloadTranslator<Create.Values<?>> { + + Schema createConfigSchema = + Schema.builder() + .addArrayField("values", FieldType.BYTES) + .addByteArrayField("serialized_coder") + .build(); + + @Override + public String getUrn() { + return PTransformTranslation.CREATE_TRANSFORM_URN; + } + + @Override + public @Nullable FunctionSpec translate( + AppliedPTransform<?, ?, Values<?>> application, SdkComponents components) throws IOException { + // Currently just returns an empty payload. + // We can implement an actual payload of runners start using this transform. + return FunctionSpec.newBuilder().setUrn(getUrn(application.getTransform())).build(); + } + + private byte[] toByteArray(Object object) { + try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream out = new ObjectOutputStream(bos)) { + out.writeObject(object); + return bos.toByteArray(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private Object fromByteArray(byte[] bytes) { + try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes); + ObjectInputStream in = new ObjectInputStream(bis)) { + return in.readObject(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public @Nullable Row toConfigRow(Values<?> transform) { + List<byte[]> encodedElements = new ArrayList<>(); + transform + .getElements() + .forEach( + object -> { + encodedElements.add(toByteArray(object)); + }); + + byte[] serializedCoder = + transform.getCoder() != null ? toByteArray(transform.getCoder()) : new byte[] {}; + return Row.withSchema(createConfigSchema) + .withFieldValue("values", encodedElements) + .withFieldValue("serialized_coder", serializedCoder) + .build(); + } + + @Override + public Create.@Nullable Values<?> fromConfigRow(Row configRow) { + Values transform = + Create.of( + configRow.getArray("values").stream() + .map(bytesValue -> fromByteArray((byte[]) bytesValue)) + .collect(Collectors.toList())); + byte[] serializedCoder = configRow.getBytes("serialized_coder"); + if (serializedCoder.length > 0) { + Coder coder = (Coder) fromByteArray(serializedCoder); + transform = transform.withCoder(coder); + } + return transform; + } + + @AutoService(TransformPayloadTranslatorRegistrar.class) + public static class Registrar implements TransformPayloadTranslatorRegistrar { + @Override + public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator> + getTransformPayloadTranslators() { + return Collections.singletonMap(Create.Values.class, new CreateTranslation()); + } + } +} Review Comment: This class was reverted. ########## runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java: ########## @@ -508,14 +510,56 @@ static RunnerApi.PTransform.Builder translateAppliedPTransform( * * <p>When going to a protocol buffer message, the translator produces a payload corresponding to * the Java representation while registering components that payload references. + * + * <p>Also, provides methods for generating a Row-based constructor config for the transform that + * can be later used to re-construct the transform. */ public interface TransformPayloadTranslator<T extends PTransform<?, ?>> { - String getUrn(T transform); + /** + * Provides a unique URN for transforms represented by this {@code TransformPayloadTranslator}. + */ + String getUrn(); + + /** + * Same as {@link #getUrn()} but the returned URN may depend on the transform provided. + * + * <p>Only override this if the same {@code TransformPayloadTranslator} used for multiple + * transforms. Otherwise, use {@link #getUrn()}. + */ + default String getUrn(T transform) { + return getUrn(); + } + + /** + * Translates the given transform represented by the provided {@code AppliedPTransform} to a + * {@code FunctionSpec} with a URN and a payload. + */ @Nullable Review Comment: Done. ########## runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java: ########## @@ -102,11 +121,370 @@ public void visitPrimitiveTransform(Node node) { // TODO(JIRA-5649): Don't even emit these transforms in the generated protos. res = elideDeprecatedViews(res); } + + ExternalTranslationOptions externalTranslationOptions = + pipeline.getOptions().as(ExternalTranslationOptions.class); + List<String> urnsToOverride = externalTranslationOptions.getTransformsToOverride(); + if (urnsToOverride.size() > 0) { + // We use PTransformPayloadTranslators and the Transform Service to re-generate the pipeline + // proto components for the updated transform and update the pipeline proto. + Map<String, PTransform> transforms = res.getComponents().getTransformsMap(); + List<String> alreadyCheckedURns = new ArrayList<>(); + for (Entry<String, PTransform> entry : transforms.entrySet()) { + String urn = entry.getValue().getSpec().getUrn(); + if (!alreadyCheckedURns.contains(urn) && urnsToOverride.contains(urn)) { + alreadyCheckedURns.add(urn); + // All transforms in the pipeline with the given urns have to be overridden. + List< + AppliedPTransform< + PInput, + POutput, + org.apache.beam.sdk.transforms.PTransform<? super PInput, POutput>>> + appliedPTransforms = + findAppliedPTransforms( + urn, pipeline, KnownTransformPayloadTranslator.KNOWN_PAYLOAD_TRANSLATORS); + for (AppliedPTransform< + PInput, + POutput, + org.apache.beam.sdk.transforms.PTransform<? super PInput, POutput>> + appliedPTransform : appliedPTransforms) { + TransformPayloadTranslator< Review Comment: Updated to attach the Row to PTransform annotations and look it up when upgrading. ########## runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java: ########## @@ -102,11 +121,370 @@ public void visitPrimitiveTransform(Node node) { // TODO(JIRA-5649): Don't even emit these transforms in the generated protos. res = elideDeprecatedViews(res); } + + ExternalTranslationOptions externalTranslationOptions = Review Comment: Moved the logic to new class "TransformUpgrader" and added unit tests to "TransformUpgraderTest". ########## runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java: ########## @@ -102,11 +121,370 @@ public void visitPrimitiveTransform(Node node) { // TODO(JIRA-5649): Don't even emit these transforms in the generated protos. res = elideDeprecatedViews(res); } + + ExternalTranslationOptions externalTranslationOptions = + pipeline.getOptions().as(ExternalTranslationOptions.class); + List<String> urnsToOverride = externalTranslationOptions.getTransformsToOverride(); + if (urnsToOverride.size() > 0) { + // We use PTransformPayloadTranslators and the Transform Service to re-generate the pipeline + // proto components for the updated transform and update the pipeline proto. + Map<String, PTransform> transforms = res.getComponents().getTransformsMap(); + List<String> alreadyCheckedURns = new ArrayList<>(); + for (Entry<String, PTransform> entry : transforms.entrySet()) { + String urn = entry.getValue().getSpec().getUrn(); + if (!alreadyCheckedURns.contains(urn) && urnsToOverride.contains(urn)) { + alreadyCheckedURns.add(urn); + // All transforms in the pipeline with the given urns have to be overridden. + List< + AppliedPTransform< + PInput, + POutput, + org.apache.beam.sdk.transforms.PTransform<? super PInput, POutput>>> + appliedPTransforms = + findAppliedPTransforms( + urn, pipeline, KnownTransformPayloadTranslator.KNOWN_PAYLOAD_TRANSLATORS); + for (AppliedPTransform< + PInput, + POutput, + org.apache.beam.sdk.transforms.PTransform<? super PInput, POutput>> + appliedPTransform : appliedPTransforms) { + TransformPayloadTranslator< + org.apache.beam.sdk.transforms.PTransform<? super PInput, POutput>> + payloadTranslator = + KnownTransformPayloadTranslator.KNOWN_PAYLOAD_TRANSLATORS.get( + appliedPTransform.getTransform().getClass()); + try { + // Override the transform using the transform service. + res = + updateTransformViaTransformService( + urn, appliedPTransform, payloadTranslator, pipeline, res); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + } + } + // Validate that translation didn't produce an invalid pipeline. PipelineValidator.validate(res); return res; } + private static int findAvailablePort() throws IOException { + ServerSocket s = new ServerSocket(0); + try { + return s.getLocalPort(); + } finally { + s.close(); + try { + // Some systems don't free the port for future use immediately. + Thread.sleep(100); + } catch (InterruptedException exn) { + // ignore + } + } + } + + // Override the given transform to the version available in a new transform service. + private static < + InputT extends PInput, + OutputT extends POutput, + TransformT extends org.apache.beam.sdk.transforms.PTransform<InputT, OutputT>> + RunnerApi.Pipeline updateTransformViaTransformService( + String urn, + AppliedPTransform< + PInput, + POutput, + org.apache.beam.sdk.transforms.PTransform<? super PInput, POutput>> + appliedPTransform, + TransformPayloadTranslator< + org.apache.beam.sdk.transforms.PTransform<? super PInput, POutput>> + originalPayloadTranslator, + Pipeline pipeline, + RunnerApi.Pipeline runnerAPIpipeline) + throws IOException { + ExternalTranslationOptions externalTranslationOptions = + pipeline.getOptions().as(ExternalTranslationOptions.class); + + // Config row to re-construct the transform within the transform service. + Row configRow = originalPayloadTranslator.toConfigRow(appliedPTransform.getTransform()); + ByteStringOutputStream outputStream = new ByteStringOutputStream(); + try { + RowCoder.of(configRow.getSchema()).encode(configRow, outputStream); + } catch (IOException e) { + throw new RuntimeException(e); + } + + // Java expansion serivice able to identify and expand transforms that includes the construction + // config provided here. + ExternalTransforms.ExternalConfigurationPayload payload = + ExternalTransforms.ExternalConfigurationPayload.newBuilder() + .setSchema(SchemaTranslation.schemaToProto(configRow.getSchema(), true)) + .setPayload(outputStream.toByteString()) + .build(); + + String serviceAddress = null; Review Comment: Updated. ########## runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java: ########## @@ -102,11 +121,370 @@ public void visitPrimitiveTransform(Node node) { // TODO(JIRA-5649): Don't even emit these transforms in the generated protos. res = elideDeprecatedViews(res); } + + ExternalTranslationOptions externalTranslationOptions = + pipeline.getOptions().as(ExternalTranslationOptions.class); + List<String> urnsToOverride = externalTranslationOptions.getTransformsToOverride(); + if (urnsToOverride.size() > 0) { + // We use PTransformPayloadTranslators and the Transform Service to re-generate the pipeline + // proto components for the updated transform and update the pipeline proto. + Map<String, PTransform> transforms = res.getComponents().getTransformsMap(); + List<String> alreadyCheckedURns = new ArrayList<>(); + for (Entry<String, PTransform> entry : transforms.entrySet()) { + String urn = entry.getValue().getSpec().getUrn(); + if (!alreadyCheckedURns.contains(urn) && urnsToOverride.contains(urn)) { + alreadyCheckedURns.add(urn); + // All transforms in the pipeline with the given urns have to be overridden. + List< + AppliedPTransform< + PInput, + POutput, + org.apache.beam.sdk.transforms.PTransform<? super PInput, POutput>>> + appliedPTransforms = + findAppliedPTransforms( + urn, pipeline, KnownTransformPayloadTranslator.KNOWN_PAYLOAD_TRANSLATORS); + for (AppliedPTransform< + PInput, + POutput, + org.apache.beam.sdk.transforms.PTransform<? super PInput, POutput>> + appliedPTransform : appliedPTransforms) { + TransformPayloadTranslator< + org.apache.beam.sdk.transforms.PTransform<? super PInput, POutput>> + payloadTranslator = + KnownTransformPayloadTranslator.KNOWN_PAYLOAD_TRANSLATORS.get( + appliedPTransform.getTransform().getClass()); + try { + // Override the transform using the transform service. + res = + updateTransformViaTransformService( + urn, appliedPTransform, payloadTranslator, pipeline, res); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + } + } + // Validate that translation didn't produce an invalid pipeline. PipelineValidator.validate(res); return res; } + private static int findAvailablePort() throws IOException { + ServerSocket s = new ServerSocket(0); + try { + return s.getLocalPort(); + } finally { + s.close(); + try { + // Some systems don't free the port for future use immediately. + Thread.sleep(100); + } catch (InterruptedException exn) { + // ignore + } + } + } + + // Override the given transform to the version available in a new transform service. + private static < + InputT extends PInput, + OutputT extends POutput, + TransformT extends org.apache.beam.sdk.transforms.PTransform<InputT, OutputT>> + RunnerApi.Pipeline updateTransformViaTransformService( + String urn, + AppliedPTransform< + PInput, + POutput, + org.apache.beam.sdk.transforms.PTransform<? super PInput, POutput>> + appliedPTransform, + TransformPayloadTranslator< + org.apache.beam.sdk.transforms.PTransform<? super PInput, POutput>> + originalPayloadTranslator, + Pipeline pipeline, + RunnerApi.Pipeline runnerAPIpipeline) + throws IOException { + ExternalTranslationOptions externalTranslationOptions = + pipeline.getOptions().as(ExternalTranslationOptions.class); + + // Config row to re-construct the transform within the transform service. + Row configRow = originalPayloadTranslator.toConfigRow(appliedPTransform.getTransform()); + ByteStringOutputStream outputStream = new ByteStringOutputStream(); + try { + RowCoder.of(configRow.getSchema()).encode(configRow, outputStream); + } catch (IOException e) { + throw new RuntimeException(e); + } + + // Java expansion serivice able to identify and expand transforms that includes the construction + // config provided here. + ExternalTransforms.ExternalConfigurationPayload payload = + ExternalTransforms.ExternalConfigurationPayload.newBuilder() + .setSchema(SchemaTranslation.schemaToProto(configRow.getSchema(), true)) + .setPayload(outputStream.toByteString()) + .build(); + + String serviceAddress = null; + TransformServiceLauncher service = null; + try { + if (externalTranslationOptions.getTransformServiceAddress() != null) { + serviceAddress = externalTranslationOptions.getTransformServiceAddress(); + } else if (externalTranslationOptions.getTransformServiceBeamVersion() != null) { + String projectName = UUID.randomUUID().toString(); + service = TransformServiceLauncher.forProject(projectName, findAvailablePort()); + service.setBeamVersion(externalTranslationOptions.getTransformServiceBeamVersion()); + + // Starting the transform service. + service.start(); + // Waiting the service to be ready. + service.waitTillUp(15000); + } else { + throw new IllegalArgumentException( + "Either option TransformServiceAddress or option TransformServiceBeamVersion should be provided to override a transform using the transform service"); + } + + if (serviceAddress == null) { + throw new IllegalArgumentException( + "Cannot override the transform " + + urn + + " since a valid transform service address could not be determined"); + } + + // Creating an ExternalTransform and expanding it using the transform service. + // Input will be the same input provided to the transform bing overridden. + ExpandableTransform<InputT, OutputT> externalTransform = + (ExpandableTransform<InputT, OutputT>) + External.of(urn, payload.toByteArray(), serviceAddress); + + PCollectionTuple input = PCollectionTuple.empty(pipeline); + for (TupleTag<?> tag : (Set<TupleTag<?>>) appliedPTransform.getInputs().keySet()) { + PCollection<?> pc = appliedPTransform.getInputs().get(tag); + if (pc == null) { + throw new IllegalArgumentException( + "Input of transform " + appliedPTransform + " with tag " + tag + " was null."); + } + input = input.and(tag, (PCollection) pc); + } + POutput output = externalTransform.expand((InputT) input); + + // Outputs of the transform being overridden. + Map<TupleTag<?>, PCollection<?>> originalOutputs = appliedPTransform.getOutputs(); + + // After expansion some transforms might still refer to the output of the already overridden + // transform as their input. + // Such inputs have to be overridden to use the output of the new upgraded transform. + Map<String, String> inputReplacements = new HashMap<>(); + + // Will contain the outputs of the upgraded transform. + Map<TupleTag<?>, PCollection<?>> newOutputs = new HashMap<>(); + + if (output instanceof PCollectionTuple) { + newOutputs.putAll(((PCollectionTuple) output).getAll()); + for (Map.Entry<TupleTag<?>, PCollection<?>> entry : newOutputs.entrySet()) { + if (entry == null) { + throw new IllegalArgumentException( + "Found unexpected null entry when iterating the outputs of expanded " + + "ExpandableTransform " + + externalTransform); + } + if (!appliedPTransform.getOutputs().containsKey(entry.getKey())) { + throw new RuntimeException( + "Could not find the tag " + entry.getKey() + " in the original set of outputs"); + } + PCollection<?> originalOutputPc = originalOutputs.get(entry.getKey()); + if (originalOutputPc == null) { + throw new IllegalArgumentException( + "Original output of transform " + + appliedPTransform + + " with tag " + + entry.getKey() + + " was null"); + } + inputReplacements.put(originalOutputPc.getName(), entry.getValue().getName()); + } + } else if (output instanceof PCollection) { + newOutputs.put(new TupleTag<>("temp_main_tag"), (PCollection) output); + inputReplacements.put( + originalOutputs.get(originalOutputs.keySet().iterator().next()).getName(), + ((PCollection) output).getName()); + } else { + throw new RuntimeException("Unexpected output type"); + } + + // We create a new AppliedPTransform to represent the upgraded transform and register it in an + // SdkComponents object. + AppliedPTransform<?, ?, ?> updatedAppliedPTransform = + AppliedPTransform.of( + appliedPTransform.getFullName() + "_external", + appliedPTransform.getInputs(), + newOutputs, + externalTransform, + externalTransform.getResourceHints(), + appliedPTransform.getPipeline()); + SdkComponents updatedComponents = + SdkComponents.create( + runnerAPIpipeline.getComponents(), runnerAPIpipeline.getRequirementsList()); + String updatedTransformId = + updatedComponents.registerPTransform(updatedAppliedPTransform, Collections.emptyList()); + RunnerApi.Components updatedRunnerApiComponents = updatedComponents.toComponents(); + + // Recording input updates to the transforms to refer to the upgraded transform instead of the + // old one. + // Also recording the newly generated id of the old (overridden) transform in the + // updatedRunnerApiComponents. + Map<String, Map<String, String>> transformInputUpdates = new HashMap<>(); + List<String> oldTransformIds = new ArrayList<>(); + updatedRunnerApiComponents + .getTransformsMap() + .forEach( + (transformId, transform) -> { + // Mapping from existing key to new value. + Map<String, String> updatedInputMap = new HashMap<>(); + for (Map.Entry<String, String> entry : transform.getInputsMap().entrySet()) { + if (inputReplacements.containsKey(entry.getValue())) { + updatedInputMap.put(entry.getKey(), inputReplacements.get(entry.getValue())); + } + } + for (Map.Entry<String, String> entry : transform.getOutputsMap().entrySet()) { + if (inputReplacements.containsKey(entry.getValue()) + && urn.equals(transform.getSpec().getUrn())) { + oldTransformIds.add(transformId); + } + } + if (updatedInputMap.size() > 0) { + transformInputUpdates.put(transformId, updatedInputMap); + } + }); + // There should be only one recorded old (upgraded) transform. + if (oldTransformIds.size() != 1) { + throw new IOException( + "Expected exactly one transform to be updated by " + + oldTransformIds.size() + + " were updated."); + } + String oldTransformId = oldTransformIds.get(0); Review Comment: Reverted. ########## runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java: ########## @@ -102,11 +121,370 @@ public void visitPrimitiveTransform(Node node) { // TODO(JIRA-5649): Don't even emit these transforms in the generated protos. res = elideDeprecatedViews(res); } + + ExternalTranslationOptions externalTranslationOptions = + pipeline.getOptions().as(ExternalTranslationOptions.class); + List<String> urnsToOverride = externalTranslationOptions.getTransformsToOverride(); + if (urnsToOverride.size() > 0) { + // We use PTransformPayloadTranslators and the Transform Service to re-generate the pipeline + // proto components for the updated transform and update the pipeline proto. + Map<String, PTransform> transforms = res.getComponents().getTransformsMap(); + List<String> alreadyCheckedURns = new ArrayList<>(); + for (Entry<String, PTransform> entry : transforms.entrySet()) { + String urn = entry.getValue().getSpec().getUrn(); + if (!alreadyCheckedURns.contains(urn) && urnsToOverride.contains(urn)) { + alreadyCheckedURns.add(urn); + // All transforms in the pipeline with the given urns have to be overridden. + List< + AppliedPTransform< + PInput, + POutput, + org.apache.beam.sdk.transforms.PTransform<? super PInput, POutput>>> + appliedPTransforms = + findAppliedPTransforms( + urn, pipeline, KnownTransformPayloadTranslator.KNOWN_PAYLOAD_TRANSLATORS); + for (AppliedPTransform< + PInput, + POutput, + org.apache.beam.sdk.transforms.PTransform<? super PInput, POutput>> + appliedPTransform : appliedPTransforms) { + TransformPayloadTranslator< + org.apache.beam.sdk.transforms.PTransform<? super PInput, POutput>> + payloadTranslator = + KnownTransformPayloadTranslator.KNOWN_PAYLOAD_TRANSLATORS.get( + appliedPTransform.getTransform().getClass()); + try { + // Override the transform using the transform service. + res = + updateTransformViaTransformService( + urn, appliedPTransform, payloadTranslator, pipeline, res); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + } + } + // Validate that translation didn't produce an invalid pipeline. PipelineValidator.validate(res); return res; } + private static int findAvailablePort() throws IOException { + ServerSocket s = new ServerSocket(0); + try { + return s.getLocalPort(); + } finally { + s.close(); + try { + // Some systems don't free the port for future use immediately. + Thread.sleep(100); + } catch (InterruptedException exn) { + // ignore + } + } + } + + // Override the given transform to the version available in a new transform service. + private static < + InputT extends PInput, + OutputT extends POutput, + TransformT extends org.apache.beam.sdk.transforms.PTransform<InputT, OutputT>> + RunnerApi.Pipeline updateTransformViaTransformService( + String urn, + AppliedPTransform< + PInput, + POutput, + org.apache.beam.sdk.transforms.PTransform<? super PInput, POutput>> + appliedPTransform, + TransformPayloadTranslator< + org.apache.beam.sdk.transforms.PTransform<? super PInput, POutput>> + originalPayloadTranslator, + Pipeline pipeline, + RunnerApi.Pipeline runnerAPIpipeline) + throws IOException { + ExternalTranslationOptions externalTranslationOptions = + pipeline.getOptions().as(ExternalTranslationOptions.class); + + // Config row to re-construct the transform within the transform service. + Row configRow = originalPayloadTranslator.toConfigRow(appliedPTransform.getTransform()); + ByteStringOutputStream outputStream = new ByteStringOutputStream(); + try { + RowCoder.of(configRow.getSchema()).encode(configRow, outputStream); + } catch (IOException e) { + throw new RuntimeException(e); + } + + // Java expansion serivice able to identify and expand transforms that includes the construction + // config provided here. + ExternalTransforms.ExternalConfigurationPayload payload = + ExternalTransforms.ExternalConfigurationPayload.newBuilder() + .setSchema(SchemaTranslation.schemaToProto(configRow.getSchema(), true)) + .setPayload(outputStream.toByteString()) + .build(); + + String serviceAddress = null; + TransformServiceLauncher service = null; + try { + if (externalTranslationOptions.getTransformServiceAddress() != null) { + serviceAddress = externalTranslationOptions.getTransformServiceAddress(); + } else if (externalTranslationOptions.getTransformServiceBeamVersion() != null) { + String projectName = UUID.randomUUID().toString(); + service = TransformServiceLauncher.forProject(projectName, findAvailablePort()); + service.setBeamVersion(externalTranslationOptions.getTransformServiceBeamVersion()); + + // Starting the transform service. + service.start(); + // Waiting the service to be ready. + service.waitTillUp(15000); + } else { + throw new IllegalArgumentException( + "Either option TransformServiceAddress or option TransformServiceBeamVersion should be provided to override a transform using the transform service"); + } + + if (serviceAddress == null) { + throw new IllegalArgumentException( + "Cannot override the transform " + + urn + + " since a valid transform service address could not be determined"); + } + + // Creating an ExternalTransform and expanding it using the transform service. + // Input will be the same input provided to the transform bing overridden. + ExpandableTransform<InputT, OutputT> externalTransform = + (ExpandableTransform<InputT, OutputT>) + External.of(urn, payload.toByteArray(), serviceAddress); + + PCollectionTuple input = PCollectionTuple.empty(pipeline); + for (TupleTag<?> tag : (Set<TupleTag<?>>) appliedPTransform.getInputs().keySet()) { + PCollection<?> pc = appliedPTransform.getInputs().get(tag); + if (pc == null) { + throw new IllegalArgumentException( + "Input of transform " + appliedPTransform + " with tag " + tag + " was null."); + } + input = input.and(tag, (PCollection) pc); + } + POutput output = externalTransform.expand((InputT) input); + + // Outputs of the transform being overridden. + Map<TupleTag<?>, PCollection<?>> originalOutputs = appliedPTransform.getOutputs(); + + // After expansion some transforms might still refer to the output of the already overridden + // transform as their input. + // Such inputs have to be overridden to use the output of the new upgraded transform. + Map<String, String> inputReplacements = new HashMap<>(); + + // Will contain the outputs of the upgraded transform. + Map<TupleTag<?>, PCollection<?>> newOutputs = new HashMap<>(); + + if (output instanceof PCollectionTuple) { + newOutputs.putAll(((PCollectionTuple) output).getAll()); + for (Map.Entry<TupleTag<?>, PCollection<?>> entry : newOutputs.entrySet()) { + if (entry == null) { + throw new IllegalArgumentException( + "Found unexpected null entry when iterating the outputs of expanded " + + "ExpandableTransform " + + externalTransform); + } + if (!appliedPTransform.getOutputs().containsKey(entry.getKey())) { + throw new RuntimeException( + "Could not find the tag " + entry.getKey() + " in the original set of outputs"); + } + PCollection<?> originalOutputPc = originalOutputs.get(entry.getKey()); + if (originalOutputPc == null) { + throw new IllegalArgumentException( + "Original output of transform " + + appliedPTransform + + " with tag " + + entry.getKey() + + " was null"); + } + inputReplacements.put(originalOutputPc.getName(), entry.getValue().getName()); + } + } else if (output instanceof PCollection) { + newOutputs.put(new TupleTag<>("temp_main_tag"), (PCollection) output); + inputReplacements.put( + originalOutputs.get(originalOutputs.keySet().iterator().next()).getName(), + ((PCollection) output).getName()); + } else { + throw new RuntimeException("Unexpected output type"); + } + + // We create a new AppliedPTransform to represent the upgraded transform and register it in an + // SdkComponents object. + AppliedPTransform<?, ?, ?> updatedAppliedPTransform = + AppliedPTransform.of( + appliedPTransform.getFullName() + "_external", + appliedPTransform.getInputs(), + newOutputs, + externalTransform, + externalTransform.getResourceHints(), + appliedPTransform.getPipeline()); + SdkComponents updatedComponents = + SdkComponents.create( + runnerAPIpipeline.getComponents(), runnerAPIpipeline.getRequirementsList()); + String updatedTransformId = + updatedComponents.registerPTransform(updatedAppliedPTransform, Collections.emptyList()); Review Comment: Reverted. ########## runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java: ########## @@ -102,11 +121,370 @@ public void visitPrimitiveTransform(Node node) { // TODO(JIRA-5649): Don't even emit these transforms in the generated protos. res = elideDeprecatedViews(res); } + + ExternalTranslationOptions externalTranslationOptions = + pipeline.getOptions().as(ExternalTranslationOptions.class); + List<String> urnsToOverride = externalTranslationOptions.getTransformsToOverride(); + if (urnsToOverride.size() > 0) { + // We use PTransformPayloadTranslators and the Transform Service to re-generate the pipeline + // proto components for the updated transform and update the pipeline proto. + Map<String, PTransform> transforms = res.getComponents().getTransformsMap(); + List<String> alreadyCheckedURns = new ArrayList<>(); + for (Entry<String, PTransform> entry : transforms.entrySet()) { + String urn = entry.getValue().getSpec().getUrn(); + if (!alreadyCheckedURns.contains(urn) && urnsToOverride.contains(urn)) { + alreadyCheckedURns.add(urn); + // All transforms in the pipeline with the given urns have to be overridden. + List< + AppliedPTransform< + PInput, + POutput, + org.apache.beam.sdk.transforms.PTransform<? super PInput, POutput>>> + appliedPTransforms = + findAppliedPTransforms( + urn, pipeline, KnownTransformPayloadTranslator.KNOWN_PAYLOAD_TRANSLATORS); + for (AppliedPTransform< + PInput, + POutput, + org.apache.beam.sdk.transforms.PTransform<? super PInput, POutput>> + appliedPTransform : appliedPTransforms) { + TransformPayloadTranslator< + org.apache.beam.sdk.transforms.PTransform<? super PInput, POutput>> + payloadTranslator = + KnownTransformPayloadTranslator.KNOWN_PAYLOAD_TRANSLATORS.get( + appliedPTransform.getTransform().getClass()); + try { + // Override the transform using the transform service. + res = + updateTransformViaTransformService( + urn, appliedPTransform, payloadTranslator, pipeline, res); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + } + } + // Validate that translation didn't produce an invalid pipeline. PipelineValidator.validate(res); return res; } + private static int findAvailablePort() throws IOException { + ServerSocket s = new ServerSocket(0); + try { + return s.getLocalPort(); + } finally { + s.close(); + try { + // Some systems don't free the port for future use immediately. + Thread.sleep(100); + } catch (InterruptedException exn) { + // ignore + } + } + } + + // Override the given transform to the version available in a new transform service. + private static < + InputT extends PInput, + OutputT extends POutput, + TransformT extends org.apache.beam.sdk.transforms.PTransform<InputT, OutputT>> + RunnerApi.Pipeline updateTransformViaTransformService( + String urn, + AppliedPTransform< + PInput, + POutput, + org.apache.beam.sdk.transforms.PTransform<? super PInput, POutput>> + appliedPTransform, + TransformPayloadTranslator< + org.apache.beam.sdk.transforms.PTransform<? super PInput, POutput>> + originalPayloadTranslator, + Pipeline pipeline, + RunnerApi.Pipeline runnerAPIpipeline) + throws IOException { + ExternalTranslationOptions externalTranslationOptions = + pipeline.getOptions().as(ExternalTranslationOptions.class); + + // Config row to re-construct the transform within the transform service. + Row configRow = originalPayloadTranslator.toConfigRow(appliedPTransform.getTransform()); + ByteStringOutputStream outputStream = new ByteStringOutputStream(); + try { + RowCoder.of(configRow.getSchema()).encode(configRow, outputStream); + } catch (IOException e) { + throw new RuntimeException(e); + } + + // Java expansion serivice able to identify and expand transforms that includes the construction + // config provided here. + ExternalTransforms.ExternalConfigurationPayload payload = + ExternalTransforms.ExternalConfigurationPayload.newBuilder() + .setSchema(SchemaTranslation.schemaToProto(configRow.getSchema(), true)) + .setPayload(outputStream.toByteString()) + .build(); + + String serviceAddress = null; + TransformServiceLauncher service = null; + try { + if (externalTranslationOptions.getTransformServiceAddress() != null) { + serviceAddress = externalTranslationOptions.getTransformServiceAddress(); + } else if (externalTranslationOptions.getTransformServiceBeamVersion() != null) { + String projectName = UUID.randomUUID().toString(); + service = TransformServiceLauncher.forProject(projectName, findAvailablePort()); + service.setBeamVersion(externalTranslationOptions.getTransformServiceBeamVersion()); + + // Starting the transform service. + service.start(); + // Waiting the service to be ready. + service.waitTillUp(15000); + } else { + throw new IllegalArgumentException( + "Either option TransformServiceAddress or option TransformServiceBeamVersion should be provided to override a transform using the transform service"); + } + + if (serviceAddress == null) { + throw new IllegalArgumentException( + "Cannot override the transform " + + urn + + " since a valid transform service address could not be determined"); + } + + // Creating an ExternalTransform and expanding it using the transform service. + // Input will be the same input provided to the transform bing overridden. + ExpandableTransform<InputT, OutputT> externalTransform = + (ExpandableTransform<InputT, OutputT>) + External.of(urn, payload.toByteArray(), serviceAddress); + + PCollectionTuple input = PCollectionTuple.empty(pipeline); + for (TupleTag<?> tag : (Set<TupleTag<?>>) appliedPTransform.getInputs().keySet()) { + PCollection<?> pc = appliedPTransform.getInputs().get(tag); + if (pc == null) { + throw new IllegalArgumentException( + "Input of transform " + appliedPTransform + " with tag " + tag + " was null."); + } + input = input.and(tag, (PCollection) pc); + } + POutput output = externalTransform.expand((InputT) input); + + // Outputs of the transform being overridden. + Map<TupleTag<?>, PCollection<?>> originalOutputs = appliedPTransform.getOutputs(); + + // After expansion some transforms might still refer to the output of the already overridden + // transform as their input. + // Such inputs have to be overridden to use the output of the new upgraded transform. + Map<String, String> inputReplacements = new HashMap<>(); + + // Will contain the outputs of the upgraded transform. + Map<TupleTag<?>, PCollection<?>> newOutputs = new HashMap<>(); + + if (output instanceof PCollectionTuple) { + newOutputs.putAll(((PCollectionTuple) output).getAll()); + for (Map.Entry<TupleTag<?>, PCollection<?>> entry : newOutputs.entrySet()) { + if (entry == null) { + throw new IllegalArgumentException( + "Found unexpected null entry when iterating the outputs of expanded " + + "ExpandableTransform " + + externalTransform); + } + if (!appliedPTransform.getOutputs().containsKey(entry.getKey())) { + throw new RuntimeException( + "Could not find the tag " + entry.getKey() + " in the original set of outputs"); + } + PCollection<?> originalOutputPc = originalOutputs.get(entry.getKey()); + if (originalOutputPc == null) { + throw new IllegalArgumentException( + "Original output of transform " + + appliedPTransform + + " with tag " + + entry.getKey() + + " was null"); + } + inputReplacements.put(originalOutputPc.getName(), entry.getValue().getName()); + } + } else if (output instanceof PCollection) { + newOutputs.put(new TupleTag<>("temp_main_tag"), (PCollection) output); + inputReplacements.put( + originalOutputs.get(originalOutputs.keySet().iterator().next()).getName(), + ((PCollection) output).getName()); + } else { + throw new RuntimeException("Unexpected output type"); + } + + // We create a new AppliedPTransform to represent the upgraded transform and register it in an + // SdkComponents object. + AppliedPTransform<?, ?, ?> updatedAppliedPTransform = + AppliedPTransform.of( + appliedPTransform.getFullName() + "_external", + appliedPTransform.getInputs(), + newOutputs, + externalTransform, + externalTransform.getResourceHints(), + appliedPTransform.getPipeline()); + SdkComponents updatedComponents = + SdkComponents.create( + runnerAPIpipeline.getComponents(), runnerAPIpipeline.getRequirementsList()); + String updatedTransformId = + updatedComponents.registerPTransform(updatedAppliedPTransform, Collections.emptyList()); + RunnerApi.Components updatedRunnerApiComponents = updatedComponents.toComponents(); + + // Recording input updates to the transforms to refer to the upgraded transform instead of the + // old one. + // Also recording the newly generated id of the old (overridden) transform in the + // updatedRunnerApiComponents. + Map<String, Map<String, String>> transformInputUpdates = new HashMap<>(); + List<String> oldTransformIds = new ArrayList<>(); + updatedRunnerApiComponents + .getTransformsMap() + .forEach( + (transformId, transform) -> { + // Mapping from existing key to new value. + Map<String, String> updatedInputMap = new HashMap<>(); + for (Map.Entry<String, String> entry : transform.getInputsMap().entrySet()) { + if (inputReplacements.containsKey(entry.getValue())) { + updatedInputMap.put(entry.getKey(), inputReplacements.get(entry.getValue())); + } + } + for (Map.Entry<String, String> entry : transform.getOutputsMap().entrySet()) { + if (inputReplacements.containsKey(entry.getValue()) + && urn.equals(transform.getSpec().getUrn())) { + oldTransformIds.add(transformId); + } + } + if (updatedInputMap.size() > 0) { + transformInputUpdates.put(transformId, updatedInputMap); + } + }); + // There should be only one recorded old (upgraded) transform. + if (oldTransformIds.size() != 1) { + throw new IOException( + "Expected exactly one transform to be updated by " + + oldTransformIds.size() + + " were updated."); + } + String oldTransformId = oldTransformIds.get(0); + + // Updated list of root transforms (in case a root was upgraded). + List<String> updaterRootTransformIds = new ArrayList<>(); + updaterRootTransformIds.addAll(runnerAPIpipeline.getRootTransformIdsList()); + if (updaterRootTransformIds.contains(oldTransformId)) { + updaterRootTransformIds.remove(oldTransformId); + updaterRootTransformIds.add(updatedTransformId); Review Comment: Updated logic should preserve the order. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
