chamikaramj commented on code in PR #30910: URL: https://github.com/apache/beam/pull/30910#discussion_r1569225112
########## model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema_aware_transforms.proto: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +// Protocol Buffers describing available Schema-Aware transforms. +// These are transforms that can be configured using Beam Schema-compatible parameters. +// Runners can override these transforms with a native implementation. + +syntax = "proto3"; + +package org.apache.beam.model.pipeline.v1; + +option go_package = "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1;pipeline_v1"; +option java_package = "org.apache.beam.model.pipeline.v1"; +option java_outer_classname = "SchemaAwareTransforms"; + +import "org/apache/beam/model/pipeline/v1/beam_runner_api.proto"; +import "org/apache/beam/model/pipeline/v1/schema.proto"; + + +message StandardSchemaAwareTransforms { + // Payload for all of these: SchemaAwareTransformPayload + enum IOs { + ICEBERG_READ = 0 [(beam_urn) = "beam:transform:iceberg_read:v1"]; + ICEBERG_WRITE = 1 [(beam_urn) = "beam:transform:iceberg_write:v1"]; + } + enum Managed { + // Payload: ManagedTransformPayload + MANAGED = 0 [(beam_urn) = "beam:transform:managed:v1"]; + } +} + + +// Payload for a Schema-aware PTransform. +// This is a transform that is aware of its input and output PCollection schemas +// and is configured using Beam Schema-compatible parameters. +message SchemaAwareTransformPayload { + // The schema of the configuration row used to upgrade the transform + Schema expansion_schema = 1; + // The configuration used to build this transform. + // Must be compatible with the configuration schema, and decodable via beam:coder:row:v1. + bytes expansion_payload = 2; +} + +// Payload for a Managed transform +// This can be used by runners that wish to override an underlying transform Review Comment: // The information available in the payload can be used by runners to override the `ManagedSchemaTransform `. ########## .github/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml: ########## @@ -111,4 +111,4 @@ jobs: - name: run Cross-Language Wrapper Validation script uses: ./.github/actions/gradle-command-self-hosted-action with: - gradle-command: :sdks:python:test-suites:direct:crossLanguageWrapperValidationPreCommit \ No newline at end of file + gradle-command: :sdks:python:test-suites:direct:crossLanguageWrapperValidationPreCommit --info Review Comment: Nit: missing newline ########## sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/DynamicDestinations.java: ########## @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.io.iceberg; +package org.apache.beam.sdk.io.iceberg; Review Comment: Update to use Externalizable ? ########## sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslation.java: ########## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.managed; + +import static org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; +import static org.apache.beam.sdk.managed.ManagedSchemaTransformProvider.ManagedConfig; +import static org.apache.beam.sdk.managed.ManagedSchemaTransformProvider.ManagedSchemaTransform; +import static org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator; + +import com.google.auto.service.AutoService; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.model.pipeline.v1.SchemaApi; +import org.apache.beam.model.pipeline.v1.SchemaAwareTransforms.ManagedTransformPayload; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaTranslation; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.construction.SdkComponents; +import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.checkerframework.checker.nullness.qual.Nullable; + +public class ManagedSchemaTransformTranslation { + static class ManagedSchemaTransformTranslator + implements TransformPayloadTranslator<ManagedSchemaTransform> { + private final ManagedSchemaTransformProvider provider; + static final Schema SCHEMA = + Schema.builder() + .addStringField("transform_identifier") + .addNullableStringField("config") + .addNullableStringField("config_url") + .build(); + + public ManagedSchemaTransformTranslator() { + provider = new ManagedSchemaTransformProvider(null); + } + + @Override + public String getUrn() { + return provider.identifier(); + } + + @Override + @SuppressWarnings("argument") + public @Nullable FunctionSpec translate( + AppliedPTransform<?, ?, ManagedSchemaTransform> application, SdkComponents components) + throws IOException { + SchemaApi.Schema expansionSchema = SchemaTranslation.schemaToProto(SCHEMA, true); + ManagedConfig managedConfig = application.getTransform().getManagedConfig(); + Row configRow = toConfigRow(application.getTransform()); + ByteArrayOutputStream os = new ByteArrayOutputStream(); + RowCoder.of(SCHEMA).encode(configRow, os); + + return FunctionSpec.newBuilder() + .setUrn(getUrn()) + .setPayload( + ManagedTransformPayload.newBuilder() + .setUnderlyingTransformUrn(managedConfig.getTransformIdentifier()) Review Comment: We want this to be the URN of the underlying transform not the schema-transform identifier. Maintain a mapping somewhere and add unit test for consistency ? ########## sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java: ########## @@ -108,8 +107,8 @@ public static ManagedTransform read(String source) { } /** - * Instantiates a {@link Managed.Write} transform for the specified sink. The supported managed - * sinks are: + * Instantiates a {@link Managed.ManagedTransform} transform for the specified sink. The supported Review Comment: We updated `ManagedTransform` to directly implement `PTransform` to avoid confusion. ########## model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema_aware_transforms.proto: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +// Protocol Buffers describing available Schema-Aware transforms. +// These are transforms that can be configured using Beam Schema-compatible parameters. +// Runners can override these transforms with a native implementation. + +syntax = "proto3"; + +package org.apache.beam.model.pipeline.v1; + +option go_package = "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1;pipeline_v1"; +option java_package = "org.apache.beam.model.pipeline.v1"; +option java_outer_classname = "SchemaAwareTransforms"; + +import "org/apache/beam/model/pipeline/v1/beam_runner_api.proto"; +import "org/apache/beam/model/pipeline/v1/schema.proto"; + + +message StandardSchemaAwareTransforms { + // Payload for all of these: SchemaAwareTransformPayload + enum IOs { + ICEBERG_READ = 0 [(beam_urn) = "beam:transform:iceberg_read:v1"]; + ICEBERG_WRITE = 1 [(beam_urn) = "beam:transform:iceberg_write:v1"]; + } + enum Managed { + // Payload: ManagedTransformPayload + MANAGED = 0 [(beam_urn) = "beam:transform:managed:v1"]; + } +} + + +// Payload for a Schema-aware PTransform. +// This is a transform that is aware of its input and output PCollection schemas +// and is configured using Beam Schema-compatible parameters. +message SchemaAwareTransformPayload { + // The schema of the configuration row used to upgrade the transform + Schema expansion_schema = 1; + // The configuration used to build this transform. Review Comment: nit: s/build/upgrade ########## model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema_aware_transforms.proto: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +// Protocol Buffers describing available Schema-Aware transforms. +// These are transforms that can be configured using Beam Schema-compatible parameters. +// Runners can override these transforms with a native implementation. + +syntax = "proto3"; + +package org.apache.beam.model.pipeline.v1; + +option go_package = "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1;pipeline_v1"; +option java_package = "org.apache.beam.model.pipeline.v1"; +option java_outer_classname = "SchemaAwareTransforms"; + +import "org/apache/beam/model/pipeline/v1/beam_runner_api.proto"; +import "org/apache/beam/model/pipeline/v1/schema.proto"; + + +message StandardSchemaAwareTransforms { + // Payload for all of these: SchemaAwareTransformPayload + enum IOs { + ICEBERG_READ = 0 [(beam_urn) = "beam:transform:iceberg_read:v1"]; + ICEBERG_WRITE = 1 [(beam_urn) = "beam:transform:iceberg_write:v1"]; + } + enum Managed { + // Payload: ManagedTransformPayload + MANAGED = 0 [(beam_urn) = "beam:transform:managed:v1"]; + } +} + + +// Payload for a Schema-aware PTransform. +// This is a transform that is aware of its input and output PCollection schemas +// and is configured using Beam Schema-compatible parameters. +message SchemaAwareTransformPayload { + // The schema of the configuration row used to upgrade the transform + Schema expansion_schema = 1; + // The configuration used to build this transform. + // Must be compatible with the configuration schema, and decodable via beam:coder:row:v1. + bytes expansion_payload = 2; +} + +// Payload for a Managed transform Review Comment: for a `ManagedSchemaTransform`. ########## model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema_aware_transforms.proto: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +// Protocol Buffers describing available Schema-Aware transforms. +// These are transforms that can be configured using Beam Schema-compatible parameters. +// Runners can override these transforms with a native implementation. + +syntax = "proto3"; + +package org.apache.beam.model.pipeline.v1; + +option go_package = "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1;pipeline_v1"; +option java_package = "org.apache.beam.model.pipeline.v1"; +option java_outer_classname = "SchemaAwareTransforms"; + +import "org/apache/beam/model/pipeline/v1/beam_runner_api.proto"; +import "org/apache/beam/model/pipeline/v1/schema.proto"; + + +message StandardSchemaAwareTransforms { + // Payload for all of these: SchemaAwareTransformPayload + enum IOs { + ICEBERG_READ = 0 [(beam_urn) = "beam:transform:iceberg_read:v1"]; + ICEBERG_WRITE = 1 [(beam_urn) = "beam:transform:iceberg_write:v1"]; + } + enum Managed { + // Payload: ManagedTransformPayload + MANAGED = 0 [(beam_urn) = "beam:transform:managed:v1"]; + } +} + + +// Payload for a Schema-aware PTransform. +// This is a transform that is aware of its input and output PCollection schemas +// and is configured using Beam Schema-compatible parameters. +message SchemaAwareTransformPayload { + // The schema of the configuration row used to upgrade the transform + Schema expansion_schema = 1; + // The configuration used to build this transform. + // Must be compatible with the configuration schema, and decodable via beam:coder:row:v1. + bytes expansion_payload = 2; +} + +// Payload for a Managed transform +// This can be used by runners that wish to override an underlying transform +// with a different implementation. +message ManagedTransformPayload { + // The underlying transform's URN. + string underlying_transform_urn = 1; + // The managed transform configuration Schema. + Schema expansion_schema = 2; + // The configuration used to build the managed transform. Review Comment: s/build/override ########## sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java: ########## @@ -51,14 +54,14 @@ public class ManagedSchemaTransformProvider @Override public String identifier() { - return "beam:schematransform:org.apache.beam:managed:v1"; + return "beam:transform:managed:v1"; Review Comment: Change to "beam:schematransform:org.apache.beam:managed:v1" to conform to the convention ? ########## model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema_aware_transforms.proto: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +// Protocol Buffers describing available Schema-Aware transforms. +// These are transforms that can be configured using Beam Schema-compatible parameters. +// Runners can override these transforms with a native implementation. + +syntax = "proto3"; + +package org.apache.beam.model.pipeline.v1; + +option go_package = "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1;pipeline_v1"; +option java_package = "org.apache.beam.model.pipeline.v1"; +option java_outer_classname = "SchemaAwareTransforms"; + +import "org/apache/beam/model/pipeline/v1/beam_runner_api.proto"; +import "org/apache/beam/model/pipeline/v1/schema.proto"; + + +message StandardSchemaAwareTransforms { + // Payload for all of these: SchemaAwareTransformPayload + enum IOs { + ICEBERG_READ = 0 [(beam_urn) = "beam:transform:iceberg_read:v1"]; + ICEBERG_WRITE = 1 [(beam_urn) = "beam:transform:iceberg_write:v1"]; + } + enum Managed { + // Payload: ManagedTransformPayload + MANAGED = 0 [(beam_urn) = "beam:transform:managed:v1"]; + } +} + + +// Payload for a Schema-aware PTransform. +// This is a transform that is aware of its input and output PCollection schemas +// and is configured using Beam Schema-compatible parameters. Review Comment: Add: // The information available in the payload can be used by runners to override the corresponding schema-aware transform . ########## sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java: ########## @@ -67,19 +70,21 @@ public ManagedSchemaTransformProvider() {} "Found multiple SchemaTransformProvider implementations with the same identifier " + schemaTransformProvider.identifier()); } - schemaTransformProviders.put(schemaTransformProvider.identifier(), schemaTransformProvider); + if (supportedIdentifiers == null + || supportedIdentifiers.contains(schemaTransformProvider.identifier())) { + schemaTransformProviders.put( + schemaTransformProvider.identifier(), schemaTransformProvider); + } } } catch (Exception e) { throw new RuntimeException(e.getMessage()); } - - schemaTransformProviders.entrySet().removeIf(e -> !supportedIdentifiers.contains(e.getKey())); } @DefaultSchema(AutoValueSchema.class) @AutoValue @VisibleForTesting - abstract static class ManagedConfig { + public abstract static class ManagedConfig { Review Comment: Can this and other classes defined in this file be package private ? ########## sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java: ########## @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.io.iceberg; Review Comment: Update to use Externalizable ? ########## sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIOTranslation.java: ########## @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import static org.apache.beam.sdk.io.iceberg.IcebergIO.ReadRows; +import static org.apache.beam.sdk.io.iceberg.IcebergIO.WriteRows; +import static org.apache.beam.sdk.util.construction.TransformUpgrader.fromByteArray; +import static org.apache.beam.sdk.util.construction.TransformUpgrader.toByteArray; + +import com.google.auto.service.AutoService; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InvalidClassException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; +import org.apache.beam.model.pipeline.v1.SchemaApi; +import org.apache.beam.model.pipeline.v1.SchemaAwareTransforms.SchemaAwareTransformPayload; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaTranslation; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator; +import org.apache.beam.sdk.util.construction.SdkComponents; +import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.catalog.TableIdentifier; +import org.checkerframework.checker.nullness.qual.Nullable; + +@SuppressWarnings({"rawtypes", "nullness"}) +public class IcebergIOTranslation { + static class IcebergIOReadTranslator implements TransformPayloadTranslator<ReadRows> { + + static final Schema READ_SCHEMA = + Schema.builder() + .addByteArrayField("catalog_config") + .addNullableStringField("table_identifier") + .build(); + + public static final String ICEBERG_READ_TRANSFORM_URN = + "beam:transform:org.apache.beam:iceberg_read:v1"; + + @Override + public String getUrn() { + return ICEBERG_READ_TRANSFORM_URN; + } + + @Override + public @Nullable FunctionSpec translate( + AppliedPTransform<?, ?, ReadRows> application, SdkComponents components) + throws IOException { + SchemaApi.Schema expansionSchema = SchemaTranslation.schemaToProto(READ_SCHEMA, true); + Row configRow = toConfigRow(application.getTransform()); + ByteArrayOutputStream os = new ByteArrayOutputStream(); + RowCoder.of(READ_SCHEMA).encode(configRow, os); + + return FunctionSpec.newBuilder() + .setUrn(getUrn()) + .setPayload( + SchemaAwareTransformPayload.newBuilder() + .setExpansionSchema(expansionSchema) + .setExpansionPayload(ByteString.copyFrom(os.toByteArray())) + .build() + .toByteString()) + .build(); + } + + @Override + public Row toConfigRow(ReadRows transform) { + + Map<String, Object> fieldValues = new HashMap<>(); + + if (transform.getCatalogConfig() != null) { + fieldValues.put("catalog_config", toByteArray(transform.getCatalogConfig())); + } + if (transform.getTableIdentifier() != null) { + TableIdentifier identifier = transform.getTableIdentifier(); + List<String> identifierParts = + Arrays.stream(identifier.namespace().levels()).collect(Collectors.toList()); + identifierParts.add(identifier.name()); + fieldValues.put("table_identifier", String.join(".", identifierParts)); + } + + return Row.withSchema(READ_SCHEMA).withFieldValues(fieldValues).build(); + } + + @Override + public ReadRows fromConfigRow(Row configRow, PipelineOptions options) { + try { + ReadRows.Builder builder = new AutoValue_IcebergIO_ReadRows.Builder(); + + byte[] catalogBytes = configRow.getBytes("catalog_config"); + if (catalogBytes != null) { + builder = builder.setCatalogConfig((IcebergCatalogConfig) fromByteArray(catalogBytes)); + } + String tableIdentifier = configRow.getString("table_identifier"); + if (tableIdentifier != null) { + builder = builder.setTableIdentifier(TableIdentifier.parse(tableIdentifier)); + } + return builder.build(); + } catch (InvalidClassException e) { + throw new RuntimeException(e); + } + } + } + + @AutoService(TransformPayloadTranslatorRegistrar.class) + public static class ReadRegistrar implements TransformPayloadTranslatorRegistrar { + @Override + @SuppressWarnings({ + "rawtypes", + }) + public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator> + getTransformPayloadTranslators() { + return ImmutableMap.<Class<? extends PTransform>, TransformPayloadTranslator>builder() + .put(AutoValue_IcebergIO_ReadRows.class, new IcebergIOReadTranslator()) + .build(); + } + } + + static class IcebergIOWriteTranslator implements TransformPayloadTranslator<WriteRows> { + + static final Schema WRITE_SCHEMA = + Schema.builder() + .addByteArrayField("catalog_config") + .addNullableStringField("table_identifier") + .addNullableByteArrayField("dynamic_destinations") + .build(); + + public static final String ICEBERG_WRITE_TRANSFORM_URN = + "beam:transform:org.apache.beam:iceberg_write:v1"; + + @Override + public String getUrn() { + return ICEBERG_WRITE_TRANSFORM_URN; + } + + @Override + public @Nullable FunctionSpec translate( + AppliedPTransform<?, ?, WriteRows> application, SdkComponents components) + throws IOException { + SchemaApi.Schema expansionSchema = SchemaTranslation.schemaToProto(WRITE_SCHEMA, true); + Row configRow = toConfigRow(application.getTransform()); + ByteArrayOutputStream os = new ByteArrayOutputStream(); + RowCoder.of(WRITE_SCHEMA).encode(configRow, os); + + return FunctionSpec.newBuilder() + .setUrn(getUrn()) + .setPayload( + SchemaAwareTransformPayload.newBuilder() + .setExpansionSchema(expansionSchema) + .setExpansionPayload(ByteString.copyFrom(os.toByteArray())) + .build() + .toByteString()) + .build(); + } + + @Override + public Row toConfigRow(WriteRows transform) { + + Map<String, Object> fieldValues = new HashMap<>(); + + if (transform.getCatalogConfig() != null) { + fieldValues.put("catalog_config", toByteArray(transform.getCatalogConfig())); + } + if (transform.getTableIdentifier() != null) { + TableIdentifier identifier = transform.getTableIdentifier(); + List<String> identifierParts = + Arrays.stream(identifier.namespace().levels()).collect(Collectors.toList()); + identifierParts.add(identifier.name()); + fieldValues.put("table_identifier", String.join(".", identifierParts)); + } + if (transform.getDynamicDestinations() != null) { + fieldValues.put("dynamic_destinations", toByteArray(transform.getDynamicDestinations())); Review Comment: As discussed offline, we should consider implementing `Externalizable` for the complex objects serialized here (`CatalogConfig` and `DynamicDestinations`). ########## sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java: ########## @@ -87,15 +88,14 @@ public class Managed { .build(); /** - * Instantiates a {@link Managed.Read} transform for the specified source. The supported managed - * sources are: + * Instantiates a {@link Managed.ManagedTransform} transform for the specified source. The Review Comment: Let's move schema-transforms identifiers for specific I/Os to constants and refer to them from both here and identifier() methods of corresponding `SchemaTransformProvider` implementations for consistency. ########## sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java: ########## @@ -122,29 +145,28 @@ protected SchemaTransform from(ManagedConfig managedConfig) { "Could not find transform with identifier %s, or it may not be supported", managedConfig.getTransformIdentifier()); - // parse config before expansion to check if it matches underlying transform's config schema - Schema transformConfigSchema = schemaTransformProvider.configurationSchema(); - Row transformConfig; - try { - transformConfig = getRowConfig(managedConfig, transformConfigSchema); - } catch (Exception e) { - throw new IllegalArgumentException( - String.format( - "Specified configuration does not align with the underlying transform's configuration schema [%s].", - transformConfigSchema), - e); - } - - return new ManagedSchemaTransform(transformConfig, schemaTransformProvider); + return new ManagedSchemaTransform(managedConfig, schemaTransformProvider); } - private static class ManagedSchemaTransform extends SchemaTransform { + public static class ManagedSchemaTransform extends SchemaTransform { Review Comment: Ditto. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
