This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b50ad0fe8fc [ManagedIO] pass underlying transform URN as an annotation
(#31398)
b50ad0fe8fc is described below
commit b50ad0fe8fc168eaded62efb08f19cf2aea341e2
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Thu May 30 17:51:07 2024 -0400
[ManagedIO] pass underlying transform URN as an annotation (#31398)
* pass underlying transform URN to annotation
* move annotation keys to proto
* address comments: add descriptions for annotation enums; fail when
missing transform_identifier; add unit tests for annotations
---
.../model/pipeline/v1/external_transforms.proto | 13 +++++++
.../beam/sdk/util/construction/BeamUrns.java | 5 +++
.../util/construction/PTransformTranslation.java | 37 ++++++++++++++------
.../sdk/util/construction/TransformUpgrader.java | 6 ++--
.../ManagedSchemaTransformTranslationTest.java | 40 ++++++++++++++++++++--
5 files changed, 86 insertions(+), 15 deletions(-)
diff --git
a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
index aa9e70c7a87..429371e1105 100644
---
a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
+++
b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
@@ -111,6 +111,19 @@ message BuilderMethod {
bytes payload = 3;
}
+message Annotations {
+ enum Enum {
+ // The annotation key for the encoded configuration Row used to build a
transform
+ CONFIG_ROW_KEY = 0 [(org.apache.beam.model.pipeline.v1.beam_constant) =
"config_row"];
+ // The annotation key for the configuration Schema used to decode the
configuration Row
+ CONFIG_ROW_SCHEMA_KEY = 1
[(org.apache.beam.model.pipeline.v1.beam_constant) = "config_row_schema"];
+ // If ths transform is a SchemaTransform, this is the annotation key for
the SchemaTransform's URN
+ SCHEMATRANSFORM_URN_KEY = 2
[(org.apache.beam.model.pipeline.v1.beam_constant) = "schematransform_urn"];
+ // If the transform is a ManagedSchemaTransform, this is the annotation
key for the underlying SchemaTransform's URN
+ MANAGED_UNDERLYING_TRANSFORM_URN_KEY = 3
[(org.apache.beam.model.pipeline.v1.beam_constant) =
"managed_underlying_transform_urn"];
+ }
+}
+
// Payload for a Schema-aware PTransform.
// This is a transform that is aware of its input and output PCollection
schemas
// and is configured using Beam Schema-compatible parameters.
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/BeamUrns.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/BeamUrns.java
index 05bb2b0e0a0..f0493de3696 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/BeamUrns.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/BeamUrns.java
@@ -26,4 +26,9 @@ public class BeamUrns {
public static String getUrn(ProtocolMessageEnum value) {
return
value.getValueDescriptor().getOptions().getExtension(RunnerApi.beamUrn);
}
+
+ /** Returns the constant value of a given enum annotated with
[(beam_constant)]. */
+ public static String getConstant(ProtocolMessageEnum value) {
+ return
value.getValueDescriptor().getOptions().getExtension(RunnerApi.beamConstant);
+ }
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java
index 5dc84897d38..e2b6d95057f 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.util.construction;
+import static org.apache.beam.model.pipeline.v1.ExternalTransforms.Annotations;
import static
org.apache.beam.model.pipeline.v1.ExternalTransforms.ExpansionMethods.Enum.SCHEMA_TRANSFORM;
import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
@@ -43,6 +44,7 @@ import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaTranslation;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -94,16 +96,12 @@ public class PTransformTranslation {
public static final String MAP_WINDOWS_TRANSFORM_URN =
"beam:transform:map_windows:v1";
public static final String MERGE_WINDOWS_TRANSFORM_URN =
"beam:transform:merge_windows:v1";
public static final String TO_STRING_TRANSFORM_URN =
"beam:transform:to_string:v1";
+ public static final String MANAGED_TRANSFORM_URN =
"beam:transform:managed:v1";
// Required runner implemented transforms. These transforms should never
specify an environment.
public static final ImmutableSet<String> RUNNER_IMPLEMENTED_TRANSFORMS =
ImmutableSet.of(GROUP_BY_KEY_TRANSFORM_URN, IMPULSE_TRANSFORM_URN);
- public static final String CONFIG_ROW_KEY = "config_row";
-
- public static final String CONFIG_ROW_SCHEMA_KEY = "config_row_schema";
- public static final String SCHEMATRANSFORM_URN_KEY = "schematransform_urn";
-
// DeprecatedPrimitives
/**
* @deprecated SDKs should move away from creating `Read` transforms and
migrate to using Impulse
@@ -522,11 +520,28 @@ public class PTransformTranslation {
}
if (spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM))) {
+ ExternalTransforms.SchemaTransformPayload payload =
+
ExternalTransforms.SchemaTransformPayload.parseFrom(spec.getPayload());
+ String identifier = payload.getIdentifier();
transformBuilder.putAnnotations(
- SCHEMATRANSFORM_URN_KEY,
- ByteString.copyFromUtf8(
-
ExternalTransforms.SchemaTransformPayload.parseFrom(spec.getPayload())
- .getIdentifier()));
+ BeamUrns.getConstant(Annotations.Enum.SCHEMATRANSFORM_URN_KEY),
+ ByteString.copyFromUtf8(identifier));
+ if (identifier.equals(MANAGED_TRANSFORM_URN)) {
+ Schema configSchema =
+
SchemaTranslation.schemaFromProto(payload.getConfigurationSchema());
+ Row configRow =
+
RowCoder.of(configSchema).decode(payload.getConfigurationRow().newInput());
+ String underlyingIdentifier =
configRow.getString("transform_identifier");
+ if (underlyingIdentifier == null) {
+ throw new IllegalStateException(
+ String.format(
+ "Encountered a Managed Transform that has an empty
\"transform_identifier\": \n%s",
+ configRow));
+ }
+ transformBuilder.putAnnotations(
+
BeamUrns.getConstant(Annotations.Enum.MANAGED_UNDERLYING_TRANSFORM_URN_KEY),
+ ByteString.copyFromUtf8(underlyingIdentifier));
+ }
}
}
@@ -546,12 +561,12 @@ public class PTransformTranslation {
}
if (configRow != null) {
transformBuilder.putAnnotations(
- CONFIG_ROW_KEY,
+ BeamUrns.getConstant(Annotations.Enum.CONFIG_ROW_KEY),
ByteString.copyFrom(
CoderUtils.encodeToByteArray(RowCoder.of(configRow.getSchema()), configRow)));
transformBuilder.putAnnotations(
- CONFIG_ROW_SCHEMA_KEY,
+ BeamUrns.getConstant(Annotations.Enum.CONFIG_ROW_SCHEMA_KEY),
ByteString.copyFrom(
SchemaTranslation.schemaToProto(configRow.getSchema(),
true).toByteArray()));
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TransformUpgrader.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TransformUpgrader.java
index deaa77d9b1b..941a5daf689 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TransformUpgrader.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TransformUpgrader.java
@@ -185,9 +185,11 @@ public class TransformUpgrader implements AutoCloseable {
throw new IllegalArgumentException("Could not find a transform with the
ID " + transformId);
}
ByteString configRowBytes =
-
transformToUpgrade.getAnnotationsOrThrow(PTransformTranslation.CONFIG_ROW_KEY);
+ transformToUpgrade.getAnnotationsOrThrow(
+
BeamUrns.getConstant(ExternalTransforms.Annotations.Enum.CONFIG_ROW_KEY));
ByteString configRowSchemaBytes =
-
transformToUpgrade.getAnnotationsOrThrow(PTransformTranslation.CONFIG_ROW_SCHEMA_KEY);
+ transformToUpgrade.getAnnotationsOrThrow(
+
BeamUrns.getConstant(ExternalTransforms.Annotations.Enum.CONFIG_ROW_SCHEMA_KEY));
SchemaApi.Schema configRowSchemaProto =
SchemaApi.Schema.parseFrom(configRowSchemaBytes.toByteArray());
diff --git
a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslationTest.java
b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslationTest.java
index 7a418976079..0b0ad532dbd 100644
---
a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslationTest.java
+++
b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslationTest.java
@@ -17,11 +17,16 @@
*/
package org.apache.beam.sdk.managed;
+import static
org.apache.beam.model.pipeline.v1.ExternalTransforms.Annotations.Enum.CONFIG_ROW_KEY;
+import static
org.apache.beam.model.pipeline.v1.ExternalTransforms.Annotations.Enum.CONFIG_ROW_SCHEMA_KEY;
+import static
org.apache.beam.model.pipeline.v1.ExternalTransforms.Annotations.Enum.MANAGED_UNDERLYING_TRANSFORM_URN_KEY;
+import static
org.apache.beam.model.pipeline.v1.ExternalTransforms.Annotations.Enum.SCHEMATRANSFORM_URN_KEY;
import static
org.apache.beam.model.pipeline.v1.ExternalTransforms.ExpansionMethods.Enum.SCHEMA_TRANSFORM;
import static
org.apache.beam.model.pipeline.v1.ExternalTransforms.SchemaTransformPayload;
import static
org.apache.beam.sdk.managed.ManagedSchemaTransformProvider.ManagedConfig;
import static
org.apache.beam.sdk.managed.ManagedSchemaTransformProvider.ManagedSchemaTransform;
import static
org.apache.beam.sdk.managed.ManagedSchemaTransformTranslation.ManagedSchemaTransformTranslator;
+import static
org.apache.beam.sdk.util.construction.PTransformTranslation.MANAGED_TRANSFORM_URN;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@@ -41,11 +46,13 @@ import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaTranslation;
import org.apache.beam.sdk.schemas.utils.YamlUtils;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.construction.BeamUrns;
import org.apache.beam.sdk.util.construction.PipelineTranslation;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import
org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.InvalidProtocolBufferException;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.junit.Test;
@@ -154,9 +161,38 @@ public class ManagedSchemaTransformTranslationTest {
})
.collect(Collectors.toList());
assertEquals(1, managedTransformProto.size());
- RunnerApi.FunctionSpec spec = managedTransformProto.get(0).getSpec();
+ RunnerApi.PTransform convertedTransform = managedTransformProto.get(0);
- // Check that the proto contains correct values
+ // Check the transform proto contains the correct annotations.
+ // These annotations can be accessed and used by the runner to make
decisions
+ Row managedConfigRow =
+ Row.withSchema(PROVIDER.configurationSchema())
+ .withFieldValue("transform_identifier",
TestSchemaTransformProvider.IDENTIFIER)
+ .withFieldValue("config", yamlStringConfig)
+ .build();
+ Map<String, ByteString> expectedAnnotations =
+ ImmutableMap.<String, ByteString>builder()
+ .put(
+ BeamUrns.getConstant(SCHEMATRANSFORM_URN_KEY),
+ ByteString.copyFromUtf8(MANAGED_TRANSFORM_URN))
+ .put(
+ BeamUrns.getConstant(MANAGED_UNDERLYING_TRANSFORM_URN_KEY),
+
ByteString.copyFromUtf8(TestSchemaTransformProvider.IDENTIFIER))
+ .put(
+ BeamUrns.getConstant(CONFIG_ROW_KEY),
+ ByteString.copyFrom(
+ CoderUtils.encodeToByteArray(
+ RowCoder.of(PROVIDER.configurationSchema()),
managedConfigRow)))
+ .put(
+ BeamUrns.getConstant(CONFIG_ROW_SCHEMA_KEY),
+ ByteString.copyFrom(
+
SchemaTranslation.schemaToProto(PROVIDER.configurationSchema(), true)
+ .toByteArray()))
+ .build();
+ assertEquals(expectedAnnotations, convertedTransform.getAnnotationsMap());
+
+ // Check that the spec proto contains correct values
+ RunnerApi.FunctionSpec spec = convertedTransform.getSpec();
SchemaTransformPayload payload =
SchemaTransformPayload.parseFrom(spec.getPayload());
assertEquals(PROVIDER.identifier(), payload.getIdentifier());
Schema schemaFromSpec =
SchemaTranslation.schemaFromProto(payload.getConfigurationSchema());