This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 54673996c9b [Java] ManagedIO (#30808)
54673996c9b is described below
commit 54673996c9bf2ee076b04833bbae2729d6cebbaf
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Mon Apr 8 06:55:17 2024 -0400
[Java] ManagedIO (#30808)
* managed api for java
* yaml utils
---
build.gradle.kts | 1 +
sdks/java/core/build.gradle | 1 +
.../apache/beam/sdk/schemas/utils/YamlUtils.java | 171 ++++++++++++++++
.../org/apache/beam/sdk/util/YamlUtilsTest.java | 228 +++++++++++++++++++++
sdks/java/managed/build.gradle | 37 ++++
.../java/org/apache/beam/sdk/managed/Managed.java | 195 ++++++++++++++++++
.../managed/ManagedSchemaTransformProvider.java | 183 +++++++++++++++++
.../org/apache/beam/sdk/managed/package-info.java | 20 ++
.../ManagedSchemaTransformProviderTest.java | 103 ++++++++++
.../org/apache/beam/sdk/managed/ManagedTest.java | 114 +++++++++++
.../sdk/managed/TestSchemaTransformProvider.java | 98 +++++++++
.../managed/src/test/resources/test_config.yaml | 21 ++
settings.gradle.kts | 2 +
13 files changed, 1174 insertions(+)
diff --git a/build.gradle.kts b/build.gradle.kts
index ded692677b5..9c42ffdc8ce 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -303,6 +303,7 @@ tasks.register("javaPreCommit") {
dependsOn(":sdks:java:io:synthetic:build")
dependsOn(":sdks:java:io:xml:build")
dependsOn(":sdks:java:javadoc:allJavadoc")
+ dependsOn(":sdks:java:managed:build")
dependsOn(":sdks:java:testing:expansion-service:build")
dependsOn(":sdks:java:testing:jpms-tests:build")
dependsOn(":sdks:java:testing:load-tests:build")
diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle
index 438a3fb1806..5a47cb5237e 100644
--- a/sdks/java/core/build.gradle
+++ b/sdks/java/core/build.gradle
@@ -98,6 +98,7 @@ dependencies {
permitUnusedDeclared
enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
provided library.java.json_org
implementation library.java.everit_json_schema
+ implementation "org.yaml:snakeyaml:2.0"
shadowTest library.java.everit_json_schema
provided library.java.junit
testImplementation "com.github.stefanbirkner:system-rules:1.19.0"
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java
new file mode 100644
index 00000000000..5c05b2bed39
--- /dev/null
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.utils;
+
+import static org.apache.beam.sdk.values.Row.toRow;
+
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.values.Row;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.CaseFormat;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.yaml.snakeyaml.Yaml;
+
+public class YamlUtils {
+ private static final Map<Schema.TypeName, Function<String, @Nullable
Object>> YAML_VALUE_PARSERS =
+ ImmutableMap
+ .<Schema.TypeName,
+ Function<String,
@org.checkerframework.checker.nullness.qual.Nullable Object>>
+ builder()
+ .put(Schema.TypeName.BYTE, Byte::valueOf)
+ .put(Schema.TypeName.INT16, Short::valueOf)
+ .put(Schema.TypeName.INT32, Integer::valueOf)
+ .put(Schema.TypeName.INT64, Long::valueOf)
+ .put(Schema.TypeName.FLOAT, Float::valueOf)
+ .put(Schema.TypeName.DOUBLE, Double::valueOf)
+ .put(Schema.TypeName.DECIMAL, BigDecimal::new)
+ .put(Schema.TypeName.BOOLEAN, Boolean::valueOf)
+ .put(Schema.TypeName.STRING, str -> str)
+ .put(Schema.TypeName.BYTES, str -> BaseEncoding.base64().decode(str))
+ .build();
+
+ public static Row toBeamRow(@Nullable String yamlString, Schema schema) {
+ return toBeamRow(yamlString, schema, false);
+ }
+
+ public static Row toBeamRow(
+ @Nullable String yamlString, Schema schema, boolean
convertNamesToCamelCase) {
+ if (yamlString == null || yamlString.isEmpty()) {
+ List<Field> requiredFields =
+ schema.getFields().stream()
+ .filter(field -> !field.getType().getNullable())
+ .collect(Collectors.toList());
+ if (requiredFields.isEmpty()) {
+ return Row.nullRow(schema);
+ } else {
+ throw new IllegalArgumentException(
+ String.format(
+ "Received an empty YAML string, but output schema contains
required fields: %s",
+ requiredFields));
+ }
+ }
+ Yaml yaml = new Yaml();
+ Object yamlMap = yaml.load(yamlString);
+
+ Preconditions.checkArgument(
+ yamlMap instanceof Map,
+ "Expected a YAML mapping but got type '%s' instead.",
+ Preconditions.checkNotNull(yamlMap).getClass());
+
+ return toBeamRow(
+ (Map<String, Object>) Preconditions.checkNotNull(yamlMap), schema,
convertNamesToCamelCase);
+ }
+
+ private static @Nullable Object toBeamValue(
+ Field field, @Nullable Object yamlValue, boolean
convertNamesToCamelCase) {
+ FieldType fieldType = field.getType();
+
+ if (yamlValue == null) {
+ if (fieldType.getNullable()) {
+ return null;
+ } else {
+ throw new IllegalArgumentException(
+ "Received null value for non-nullable field \"" + field.getName()
+ "\"");
+ }
+ }
+
+ if (yamlValue instanceof String
+ || yamlValue instanceof Number
+ || yamlValue instanceof Boolean) {
+ String yamlStringValue = yamlValue.toString();
+ if (YAML_VALUE_PARSERS.containsKey(fieldType.getTypeName())) {
+ return
YAML_VALUE_PARSERS.get(fieldType.getTypeName()).apply(yamlStringValue);
+ }
+ }
+
+ if (yamlValue instanceof byte[] && fieldType.getTypeName() ==
Schema.TypeName.BYTES) {
+ return yamlValue;
+ }
+
+ if (yamlValue instanceof List) {
+ FieldType innerType =
+ Preconditions.checkNotNull(
+ fieldType.getCollectionElementType(),
+ "Cannot convert YAML type '%s` to `%s` because the YAML value is
a List, but the output schema field does not define a collection type.",
+ yamlValue.getClass(),
+ fieldType);
+ return ((List<Object>) yamlValue)
+ .stream()
+ .map(
+ v ->
+ Preconditions.checkNotNull(
+ toBeamValue(field.withType(innerType), v,
convertNamesToCamelCase)))
+ .collect(Collectors.toList());
+ }
+
+ if (yamlValue instanceof Map) {
+ if (fieldType.getTypeName() == Schema.TypeName.ROW) {
+ Schema nestedSchema =
+ Preconditions.checkNotNull(
+ fieldType.getRowSchema(),
+ "Received a YAML '%s' type, but output schema field '%s' does
not define a Row Schema",
+ yamlValue.getClass(),
+ fieldType);
+ return toBeamRow((Map<String, Object>) yamlValue, nestedSchema,
convertNamesToCamelCase);
+ } else if (fieldType.getTypeName() == Schema.TypeName.MAP) {
+ return yamlValue;
+ }
+ }
+
+ throw new UnsupportedOperationException(
+ String.format(
+ "Converting YAML type '%s' to '%s' is not supported",
yamlValue.getClass(), fieldType));
+ }
+
+ @SuppressWarnings("nullness")
+ public static Row toBeamRow(Map<String, Object> yamlMap, Schema rowSchema,
boolean toCamelCase) {
+ return rowSchema.getFields().stream()
+ .map(
+ field ->
+ toBeamValue(
+ field,
+ yamlMap.get(maybeGetSnakeCase(field.getName(),
toCamelCase)),
+ toCamelCase))
+ .collect(toRow(rowSchema));
+ }
+
+ private static String maybeGetSnakeCase(String str, boolean getSnakeCase) {
+ return getSnakeCase ?
CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, str) : str;
+ }
+
+ public static String yamlStringFromMap(@Nullable Map<String, Object> map) {
+ if (map == null || map.isEmpty()) {
+ return "";
+ }
+ return new Yaml().dumpAsMap(map);
+ }
+}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java
new file mode 100644
index 00000000000..6e6984dde3a
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.YamlUtils;
+import org.apache.beam.sdk.values.Row;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class YamlUtilsTest {
+ @Rule public transient ExpectedException thrown = ExpectedException.none();
+
+ public String makeNested(String input) {
+ return Arrays.stream(input.split("\n"))
+ .map(str -> " " + str)
+ .collect(Collectors.joining("\n"));
+ }
+
+ @Test
+ public void testEmptyYamlString() {
+ Schema schema = Schema.builder().build();
+
+ assertEquals(Row.nullRow(schema), YamlUtils.toBeamRow("", schema));
+ }
+
+ @Test
+ public void testInvalidEmptyYamlWithNonEmptySchema() {
+ Schema schema = Schema.builder().addStringField("dummy").build();
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(
+ "Received an empty YAML string, but output schema contains required
fields");
+ thrown.expectMessage("dummy");
+
+ YamlUtils.toBeamRow("", schema);
+ }
+
+ @Test
+ public void testNullableValues() {
+ String yamlString = "nullable_string:\n" + "nullable_integer:\n" +
"nullable_boolean:\n";
+ Schema schema =
+ Schema.builder()
+ .addNullableStringField("nullable_string")
+ .addNullableInt32Field("nullable_integer")
+ .addNullableBooleanField("nullable_boolean")
+ .build();
+
+ assertEquals(Row.nullRow(schema), YamlUtils.toBeamRow(yamlString, schema));
+ }
+
+ @Test
+ public void testMissingNullableValues() {
+ String yamlString = "nullable_string:";
+ Schema schema =
+ Schema.builder()
+ .addNullableStringField("nullable_string")
+ .addNullableInt32Field("nullable_integer")
+ .addNullableBooleanField("nullable_boolean")
+ .build();
+
+ assertEquals(Row.nullRow(schema), YamlUtils.toBeamRow(yamlString, schema));
+ }
+
+ @Test
+ public void testInvalidNullableValues() {
+ String yamlString = "nullable_string:\n" + "integer:";
+ Schema schema =
+
Schema.builder().addNullableStringField("nullable_string").addInt32Field("integer").build();
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Received null value for non-nullable field
\"integer\"");
+ YamlUtils.toBeamRow(yamlString, schema);
+ }
+
+ @Test
+ public void testInvalidMissingRequiredValues() {
+ String yamlString = "nullable_string:";
+ Schema schema =
+
Schema.builder().addNullableStringField("nullable_string").addInt32Field("integer").build();
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Received null value for non-nullable field
\"integer\"");
+
+ YamlUtils.toBeamRow(yamlString, schema);
+ }
+
+ @Test
+ public void testExtraFieldsAreIgnored() {
+ String yamlString = "field1: val1\n" + "field2: val2";
+ Schema schema = Schema.builder().addStringField("field1").build();
+ Row expectedRow = Row.withSchema(schema).withFieldValue("field1",
"val1").build();
+
+ assertEquals(expectedRow, YamlUtils.toBeamRow(yamlString, schema));
+ }
+
+ @Test
+ public void testInvalidTopLevelArray() {
+ String invalidYaml = "- top_level_list" + "- another_list";
+ Schema schema = Schema.builder().build();
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Expected a YAML mapping");
+ YamlUtils.toBeamRow(invalidYaml, schema);
+ }
+
+ private static final Schema FLAT_SCHEMA =
+ Schema.builder()
+ .addByteField("byte_field")
+ .addInt16Field("int16_field")
+ .addInt32Field("int32_field")
+ .addInt64Field("int64_field")
+ .addFloatField("float_field")
+ .addDoubleField("double_field")
+ .addDecimalField("decimal_field")
+ .addBooleanField("boolean_field")
+ .addStringField("string_field")
+ .addByteArrayField("bytes_field")
+ .build();
+
+ private static final Row FLAT_ROW =
+ Row.withSchema(FLAT_SCHEMA)
+ .withFieldValue("byte_field", Byte.valueOf("123"))
+ .withFieldValue("int16_field", Short.valueOf("16"))
+ .withFieldValue("int32_field", 32)
+ .withFieldValue("int64_field", 64L)
+ .withFieldValue("float_field", 123.456F)
+ .withFieldValue("double_field", 456.789)
+ .withFieldValue("decimal_field", BigDecimal.valueOf(789.123))
+ .withFieldValue("boolean_field", true)
+ .withFieldValue("string_field", "some string")
+ .withFieldValue("bytes_field", BaseEncoding.base64().decode("abc"))
+ .build();
+
+ private static final String FLAT_YAML =
+ "byte_field: 123\n"
+ + "int16_field: 16\n"
+ + "int32_field: 32\n"
+ + "int64_field: 64\n"
+ + "float_field: 123.456\n"
+ + "double_field: 456.789\n"
+ + "decimal_field: 789.123\n"
+ + "boolean_field: true\n"
+ + "string_field: some string\n"
+ + "bytes_field: abc";
+
+ @Test
+ public void testAllTypesFlat() {
+ assertEquals(FLAT_ROW, YamlUtils.toBeamRow(FLAT_YAML, FLAT_SCHEMA));
+ }
+
+ @Test
+ public void testAllTypesNested() {
+ String nestedFlatTypes = makeNested(FLAT_YAML);
+ String topLevelYaml = "top_string: abc\n" + "nested: \n" + nestedFlatTypes;
+
+ Schema schema =
+ Schema.builder().addStringField("top_string").addRowField("nested",
FLAT_SCHEMA).build();
+ Row expectedRow =
+ Row.withSchema(schema)
+ .withFieldValue("top_string", "abc")
+ .withFieldValue("nested", FLAT_ROW)
+ .build();
+
+ assertEquals(expectedRow, YamlUtils.toBeamRow(topLevelYaml, schema));
+ }
+
+ private static final String INT_ARRAY_YAML =
+ "arr:\n" + " - 1\n" + " - 2\n" + " - 3\n" + " - 4\n" + " - 5\n";
+
+ private static final Schema INT_ARRAY_SCHEMA =
+ Schema.builder().addArrayField("arr", Schema.FieldType.INT32).build();
+
+ private static final Row INT_ARRAY_ROW =
+ Row.withSchema(INT_ARRAY_SCHEMA)
+ .withFieldValue("arr", IntStream.range(1,
6).boxed().collect(Collectors.toList()))
+ .build();
+
+ @Test
+ public void testArray() {
+ assertEquals(INT_ARRAY_ROW, YamlUtils.toBeamRow(INT_ARRAY_YAML,
INT_ARRAY_SCHEMA));
+ }
+
+ @Test
+ public void testNestedArray() {
+ String nestedArray = makeNested(INT_ARRAY_YAML);
+ String yamlString = "str_field: some string\n" + "nested: \n" +
nestedArray;
+
+ Schema schema =
+ Schema.builder()
+ .addStringField("str_field")
+ .addRowField("nested", INT_ARRAY_SCHEMA)
+ .build();
+
+ Row expectedRow =
+ Row.withSchema(schema)
+ .withFieldValue("str_field", "some string")
+ .withFieldValue("nested", INT_ARRAY_ROW)
+ .build();
+
+ assertEquals(expectedRow, YamlUtils.toBeamRow(yamlString, schema));
+ }
+}
diff --git a/sdks/java/managed/build.gradle b/sdks/java/managed/build.gradle
new file mode 100644
index 00000000000..88e537d66f8
--- /dev/null
+++ b/sdks/java/managed/build.gradle
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: 'org.apache.beam.module'
+applyJavaNature(
+ automaticModuleName: 'org.apache.beam.sdk.managed',
+)
+
+
+description = "Apache Beam :: SDKs :: Java :: Managed"
+ext.summary = """Library that provides managed IOs."""
+
+
+dependencies {
+ implementation project(path: ":sdks:java:core", configuration: "shadow")
+ implementation library.java.vendored_guava_32_1_2_jre
+// implementation library.java.vendored_grpc_1_60_1
+
+ testImplementation library.java.junit
+ testRuntimeOnly "org.yaml:snakeyaml:2.0"
+ testRuntimeOnly project(path: ":runners:direct-java", configuration:
"shadow")
+}
diff --git
a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
new file mode 100644
index 00000000000..b2b010b1e43
--- /dev/null
+++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.utils.YamlUtils;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * Top-level {@link org.apache.beam.sdk.transforms.PTransform}s that build and
instantiate turnkey
+ * transforms.
+ *
+ * <h3>Available transforms</h3>
+ *
+ * <p>This API currently supports two operations: {@link Read} and {@link
Write}. Each one
+ * enumerates the available transforms in a {@code TRANSFORMS} map.
+ *
+ * <h3>Building a Managed turnkey transform</h3>
+ *
+ * <p>Turnkey transforms are represented as {@link SchemaTransform}s, which
means each one has a
+ * defined configuration. A given transform can be built with a {@code
Map<String, Object>} that
+ * specifies arguments using like so:
+ *
+ * <pre>{@code
+ * PCollectionRowTuple output = PCollectionRowTuple.empty(pipeline).apply(
+ * Managed.read(ICEBERG)
+ * .withConfig(ImmutableMap.<String, Map>.builder()
+ * .put("foo", "abc")
+ * .put("bar", 123)
+ * .build()));
+ * }</pre>
+ *
+ * <p>Instead of specifying configuration arguments directly in the code, one
can provide the
+ * location to a YAML file that contains this information. Say we have the
following YAML file:
+ *
+ * <pre>{@code
+ * foo: "abc"
+ * bar: 123
+ * }</pre>
+ *
+ * <p>The file's path can be passed in to the Managed API like so:
+ *
+ * <pre>{@code
+ * PCollectionRowTuple input = PCollectionRowTuple.of("input",
pipeline.apply(Create.of(...)))
+ *
+ * PCollectionRowTuple output = input.apply(
+ * Managed.write(ICEBERG)
+ * .withConfigUrl(<config path>));
+ * }</pre>
+ */
+public class Managed {
+
+ // TODO: Dynamically generate a list of supported transforms
+ public static final String ICEBERG = "iceberg";
+
+ public static final Map<String, String> READ_TRANSFORMS =
+ ImmutableMap.<String, String>builder()
+ .put(ICEBERG, "beam:schematransform:org.apache.beam:iceberg_read:v1")
+ .build();
+ public static final Map<String, String> WRITE_TRANSFORMS =
+ ImmutableMap.<String, String>builder()
+ .put(ICEBERG,
"beam:schematransform:org.apache.beam:iceberg_write:v1")
+ .build();
+
+ /**
+ * Instantiates a {@link Managed.Read} transform for the specified source.
The supported managed
+ * sources are:
+ *
+ * <ul>
+ * <li>{@link Managed#ICEBERG} : Read from Apache Iceberg
+ * </ul>
+ */
+ public static ManagedTransform read(String source) {
+
+ return new AutoValue_Managed_ManagedTransform.Builder()
+ .setIdentifier(
+ Preconditions.checkNotNull(
+ READ_TRANSFORMS.get(source.toLowerCase()),
+ "An unsupported source was specified: '%s'. Please specify one
of the following sources: %s",
+ source,
+ READ_TRANSFORMS.keySet()))
+ .setSupportedIdentifiers(new ArrayList<>(READ_TRANSFORMS.values()))
+ .build();
+ }
+
+ /**
+ * Instantiates a {@link Managed.Write} transform for the specified sink.
The supported managed
+ * sinks are:
+ *
+ * <ul>
+ * <li>{@link Managed#ICEBERG} : Write to Apache Iceberg
+ * </ul>
+ */
+ public static ManagedTransform write(String sink) {
+ return new AutoValue_Managed_ManagedTransform.Builder()
+ .setIdentifier(
+ Preconditions.checkNotNull(
+ WRITE_TRANSFORMS.get(sink.toLowerCase()),
+ "An unsupported sink was specified: '%s'. Please specify one
of the following sinks: %s",
+ sink,
+ WRITE_TRANSFORMS.keySet()))
+ .setSupportedIdentifiers(new ArrayList<>(WRITE_TRANSFORMS.values()))
+ .build();
+ }
+
+ @AutoValue
+ public abstract static class ManagedTransform extends SchemaTransform {
+ abstract String getIdentifier();
+
+ abstract @Nullable String getConfig();
+
+ abstract @Nullable String getConfigUrl();
+
+ @VisibleForTesting
+ abstract List<String> getSupportedIdentifiers();
+
+ abstract Builder toBuilder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setIdentifier(String identifier);
+
+ abstract Builder setConfig(@Nullable String config);
+
+ abstract Builder setConfigUrl(@Nullable String configUrl);
+
+ @VisibleForTesting
+ abstract Builder setSupportedIdentifiers(List<String>
supportedIdentifiers);
+
+ abstract ManagedTransform build();
+ }
+
+ /**
+ * Use the input Map of configuration arguments to build and instantiate
the underlying
+ * transform. The map can ignore nullable parameters, but needs to include
all required
+ * parameters. Check the underlying transform's schema ({@link
+ * SchemaTransformProvider#configurationSchema()}) to see which parameters
are available.
+ */
+ public ManagedTransform withConfig(Map<String, Object> config) {
+ return
toBuilder().setConfig(YamlUtils.yamlStringFromMap(config)).build();
+ }
+
+ /**
+ * Like {@link #withConfig(Map)}, but instead extracts the configuration
arguments from a
+ * specified YAML file location.
+ */
+ public ManagedTransform withConfigUrl(String configUrl) {
+ return toBuilder().setConfigUrl(configUrl).build();
+ }
+
+ @VisibleForTesting
+ ManagedTransform withSupportedIdentifiers(List<String>
supportedIdentifiers) {
+ return toBuilder().setSupportedIdentifiers(supportedIdentifiers).build();
+ }
+
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ ManagedSchemaTransformProvider.ManagedConfig managedConfig =
+ ManagedSchemaTransformProvider.ManagedConfig.builder()
+ .setTransformIdentifier(getIdentifier())
+ .setConfig(getConfig())
+ .setConfigUrl(getConfigUrl())
+ .build();
+
+ SchemaTransform underlyingTransform =
+ new
ManagedSchemaTransformProvider(getSupportedIdentifiers()).from(managedConfig);
+
+ return input.apply(underlyingTransform);
+ }
+ }
+}
diff --git
a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
new file mode 100644
index 00000000000..1ee2b11a90f
--- /dev/null
+++
b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ServiceLoader;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.utils.YamlUtils;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+
+@AutoService(SchemaTransformProvider.class)
+public class ManagedSchemaTransformProvider
+ extends
TypedSchemaTransformProvider<ManagedSchemaTransformProvider.ManagedConfig> {
+
+ @Override
+ public String identifier() {
+ return "beam:schematransform:org.apache.beam:managed:v1";
+ }
+
+ private final Map<String, SchemaTransformProvider> schemaTransformProviders
= new HashMap<>();
+
+ public ManagedSchemaTransformProvider() {}
+
+ ManagedSchemaTransformProvider(Collection<String> supportedIdentifiers) {
+ try {
+ for (SchemaTransformProvider schemaTransformProvider :
+ ServiceLoader.load(SchemaTransformProvider.class)) {
+ if
(schemaTransformProviders.containsKey(schemaTransformProvider.identifier())) {
+ throw new IllegalArgumentException(
+ "Found multiple SchemaTransformProvider implementations with the
same identifier "
+ + schemaTransformProvider.identifier());
+ }
+ schemaTransformProviders.put(schemaTransformProvider.identifier(),
schemaTransformProvider);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e.getMessage());
+ }
+
+ schemaTransformProviders.entrySet().removeIf(e ->
!supportedIdentifiers.contains(e.getKey()));
+ }
+
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ @VisibleForTesting
+ abstract static class ManagedConfig {
+ public static Builder builder() {
+ return new
AutoValue_ManagedSchemaTransformProvider_ManagedConfig.Builder();
+ }
+
+ @SchemaFieldDescription("Identifier of the underlying IO to instantiate.")
+ public abstract String getTransformIdentifier();
+
+ @SchemaFieldDescription("URL path to the YAML config file used to build
the underlying IO.")
+ public abstract @Nullable String getConfigUrl();
+
+ @SchemaFieldDescription("YAML string config used to build the underlying
IO.")
+ public abstract @Nullable String getConfig();
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder setTransformIdentifier(String identifier);
+
+ public abstract Builder setConfigUrl(@Nullable String configUrl);
+
+ public abstract Builder setConfig(@Nullable String config);
+
+ public abstract ManagedConfig build();
+ }
+
+ protected void validate() {
+ boolean configExists = !Strings.isNullOrEmpty(getConfig());
+ boolean configUrlExists = !Strings.isNullOrEmpty(getConfigUrl());
+ checkArgument(
+ !(configExists && configUrlExists) && (configExists ||
configUrlExists),
+ "Please specify a config or a config URL, but not both.");
+ }
+ }
+
+ @Override
+ protected SchemaTransform from(ManagedConfig managedConfig) {
+ managedConfig.validate();
+ SchemaTransformProvider schemaTransformProvider =
+ Preconditions.checkNotNull(
+
schemaTransformProviders.get(managedConfig.getTransformIdentifier()),
+ "Could not find transform with identifier %s, or it may not be
supported",
+ managedConfig.getTransformIdentifier());
+
+ // parse config before expansion to check if it matches underlying
transform's config schema
+ Schema transformConfigSchema =
schemaTransformProvider.configurationSchema();
+ Row transformConfig;
+ try {
+ transformConfig = getRowConfig(managedConfig, transformConfigSchema);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Specified configuration does not align with the underlying
transform's configuration schema [%s].",
+ transformConfigSchema),
+ e);
+ }
+
+ return new ManagedSchemaTransform(transformConfig,
schemaTransformProvider);
+ }
+
+ private static class ManagedSchemaTransform extends SchemaTransform {
+ private final Row transformConfig;
+ private final SchemaTransformProvider underlyingTransformProvider;
+
+ ManagedSchemaTransform(
+ Row transformConfig, SchemaTransformProvider
underlyingTransformProvider) {
+ this.transformConfig = transformConfig;
+ this.underlyingTransformProvider = underlyingTransformProvider;
+ }
+
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ SchemaTransform underlyingTransform =
underlyingTransformProvider.from(transformConfig);
+
+ return input.apply(underlyingTransform);
+ }
+ }
+
+ @VisibleForTesting
+ static Row getRowConfig(ManagedConfig config, Schema transformSchema) {
+ String transformYamlConfig;
+ if (!Strings.isNullOrEmpty(config.getConfigUrl())) {
+ try {
+ MatchResult.Metadata fileMetaData =
+
FileSystems.matchSingleFileSpec(Preconditions.checkNotNull(config.getConfigUrl()));
+ ByteBuffer buffer = ByteBuffer.allocate((int)
fileMetaData.sizeBytes());
+ FileSystems.open(fileMetaData.resourceId()).read(buffer);
+ transformYamlConfig = new String(buffer.array(),
StandardCharsets.UTF_8);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ transformYamlConfig = config.getConfig();
+ }
+
+ return YamlUtils.toBeamRow(transformYamlConfig, transformSchema, true);
+ }
+
+ @VisibleForTesting
+ Map<String, SchemaTransformProvider> getAllProviders() {
+ return schemaTransformProviders;
+ }
+}
diff --git
a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/package-info.java
b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/package-info.java
new file mode 100644
index 00000000000..d129e4a7a22
--- /dev/null
+++
b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Managed reads and writes. */
+package org.apache.beam.sdk.managed;
diff --git
a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java
b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java
new file mode 100644
index 00000000000..0c495d0d2c5
--- /dev/null
+++
b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import static
org.apache.beam.sdk.managed.ManagedSchemaTransformProvider.ManagedConfig;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class ManagedSchemaTransformProviderTest {
+ @Rule public transient ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testFailWhenNoConfigSpecified() {
+ ManagedSchemaTransformProvider.ManagedConfig config =
+ ManagedSchemaTransformProvider.ManagedConfig.builder()
+ .setTransformIdentifier("some identifier")
+ .build();
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Please specify a config or a config URL, but not
both");
+ config.validate();
+ }
+
+ @Test
+ public void testGetRowFromYamlConfig() {
+ String yamlString = "extra_string: abc\n" + "extra_integer: 123";
+ ManagedConfig config =
+ ManagedConfig.builder()
+ .setTransformIdentifier(TestSchemaTransformProvider.IDENTIFIER)
+ .setConfig(yamlString)
+ .build();
+ Schema configSchema = new
TestSchemaTransformProvider().configurationSchema();
+ Row expectedRow =
+ Row.withSchema(configSchema)
+ .withFieldValue("extraString", "abc")
+ .withFieldValue("extraInteger", 123)
+ .build();
+ Row configRow =
+ ManagedSchemaTransformProvider.getRowConfig(
+ config, new TestSchemaTransformProvider().configurationSchema());
+
+ assertEquals(expectedRow, configRow);
+ }
+
+ @Test
+ public void testGetRowFromConfigUrl() throws URISyntaxException {
+ String yamlConfigPath =
+
Paths.get(getClass().getClassLoader().getResource("test_config.yaml").toURI())
+ .toFile()
+ .getAbsolutePath();
+ ManagedConfig config =
+ ManagedConfig.builder()
+ .setTransformIdentifier(TestSchemaTransformProvider.IDENTIFIER)
+ .setConfigUrl(yamlConfigPath)
+ .build();
+ Schema configSchema = new
TestSchemaTransformProvider().configurationSchema();
+ Row expectedRow =
+ Row.withSchema(configSchema)
+ .withFieldValue("extraString", "abc")
+ .withFieldValue("extraInteger", 123)
+ .build();
+ Row configRow =
+ ManagedSchemaTransformProvider.getRowConfig(
+ config, new TestSchemaTransformProvider().configurationSchema());
+
+ assertEquals(expectedRow, configRow);
+ }
+
+ @Test
+ public void testDiscoverTestProvider() {
+ ManagedSchemaTransformProvider provider =
+ new
ManagedSchemaTransformProvider(Arrays.asList(TestSchemaTransformProvider.IDENTIFIER));
+
+
assertTrue(provider.getAllProviders().containsKey(TestSchemaTransformProvider.IDENTIFIER));
+ }
+}
diff --git
a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java
b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java
new file mode 100644
index 00000000000..ceb71a06f33
--- /dev/null
+++
b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class ManagedTest {
+ @Rule public transient ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testInvalidTransform() {
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage("An unsupported source was specified");
+ Managed.read("nonexistent-source");
+
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage("An unsupported sink was specified");
+ Managed.write("nonexistent-sink");
+ }
+
+ @Rule public TestPipeline pipeline = TestPipeline.create();
+
+ private static final Schema SCHEMA =
+ Schema.builder().addStringField("str").addInt32Field("int").build();
+ private static final List<Row> ROWS =
+ Arrays.asList(
+ Row.withSchema(SCHEMA).withFieldValue("str",
"a").withFieldValue("int", 1).build(),
+ Row.withSchema(SCHEMA).withFieldValue("str",
"b").withFieldValue("int", 2).build(),
+ Row.withSchema(SCHEMA).withFieldValue("str",
"c").withFieldValue("int", 3).build());
+
+ public void runTestProviderTest(Managed.ManagedTransform writeOp) {
+ PCollection<Row> rows =
+ PCollectionRowTuple.of("input",
pipeline.apply(Create.of(ROWS)).setRowSchema(SCHEMA))
+ .apply(writeOp)
+ .get("output");
+
+ Schema outputSchema = rows.getSchema();
+ PAssert.that(rows)
+ .containsInAnyOrder(
+ ROWS.stream()
+ .map(
+ row ->
+ Row.withSchema(outputSchema)
+ .addValues(row.getValues())
+ .addValue("abc")
+ .addValue(123)
+ .build())
+ .collect(Collectors.toList()));
+ pipeline.run();
+ }
+
+ @Test
+ public void testManagedTestProviderWithConfigMap() {
+ Managed.ManagedTransform writeOp =
+ Managed.write(Managed.ICEBERG)
+ .toBuilder()
+ .setIdentifier(TestSchemaTransformProvider.IDENTIFIER)
+ .build()
+
.withSupportedIdentifiers(Arrays.asList(TestSchemaTransformProvider.IDENTIFIER))
+ .withConfig(ImmutableMap.of("extra_string", "abc",
"extra_integer", 123));
+
+ runTestProviderTest(writeOp);
+ }
+
+ @Test
+ public void testManagedTestProviderWithConfigFile() throws Exception {
+ String yamlConfigPath =
+
Paths.get(getClass().getClassLoader().getResource("test_config.yaml").toURI())
+ .toFile()
+ .getAbsolutePath();
+
+ Managed.ManagedTransform writeOp =
+ Managed.write(Managed.ICEBERG)
+ .toBuilder()
+ .setIdentifier(TestSchemaTransformProvider.IDENTIFIER)
+ .build()
+
.withSupportedIdentifiers(Arrays.asList(TestSchemaTransformProvider.IDENTIFIER))
+ .withConfigUrl(yamlConfigPath);
+
+ runTestProviderTest(writeOp);
+ }
+}
diff --git
a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/TestSchemaTransformProvider.java
b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/TestSchemaTransformProvider.java
new file mode 100644
index 00000000000..136d98d468d
--- /dev/null
+++
b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/TestSchemaTransformProvider.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+
+@AutoService(SchemaTransformProvider.class)
+public class TestSchemaTransformProvider
+ extends TypedSchemaTransformProvider<TestSchemaTransformProvider.Config> {
+ static final String IDENTIFIER =
"beam:schematransform:org.apache.beam:test_transform:v1";
+
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ public abstract static class Config {
+ public static Builder builder() {
+ return new AutoValue_TestSchemaTransformProvider_Config.Builder();
+ }
+
+ @SchemaFieldDescription("String to add to each row element.")
+ public abstract String getExtraString();
+
+ @SchemaFieldDescription("Integer to add to each row element.")
+ public abstract Integer getExtraInteger();
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder setExtraString(String extraString);
+
+ public abstract Builder setExtraInteger(Integer extraInteger);
+
+ public abstract Config build();
+ }
+ }
+
+ @Override
+ public SchemaTransform from(Config config) {
+ String extraString = config.getExtraString();
+ Integer extraInteger = config.getExtraInteger();
+ return new SchemaTransform() {
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ Schema schema =
+ Schema.builder()
+ .addFields(input.get("input").getSchema().getFields())
+ .addStringField("extra_string")
+ .addInt32Field("extra_integer")
+ .build();
+ PCollection<Row> rows =
+ input
+ .get("input")
+ .apply(
+ MapElements.into(TypeDescriptors.rows())
+ .via(
+ row ->
+ Row.withSchema(schema)
+ .addValues(row.getValues())
+ .addValue(extraString)
+ .addValue(extraInteger)
+ .build()))
+ .setRowSchema(schema);
+ return PCollectionRowTuple.of("output", rows);
+ }
+ };
+ }
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+}
diff --git a/sdks/java/managed/src/test/resources/test_config.yaml
b/sdks/java/managed/src/test/resources/test_config.yaml
new file mode 100644
index 00000000000..7725c32b348
--- /dev/null
+++ b/sdks/java/managed/src/test/resources/test_config.yaml
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+extra_string: "abc"
+extra_integer: 123
\ No newline at end of file
diff --git a/settings.gradle.kts b/settings.gradle.kts
index ec11fd32fdd..1e52e425b21 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -353,3 +353,5 @@ include("sdks:java:io:kafka:kafka-100")
findProject(":sdks:java:io:kafka:kafka-100")?.name = "kafka-100"
include("sdks:java:io:kafka:kafka-01103")
findProject(":sdks:java:io:kafka:kafka-01103")?.name = "kafka-01103"
+include("sdks:java:managed")
+findProject(":sdks:java:managed")?.name = "managed"