chamikaramj commented on code in PR #30910:
URL: https://github.com/apache/beam/pull/30910#discussion_r1569225112


##########
model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema_aware_transforms.proto:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+// Protocol Buffers describing available Schema-Aware transforms.
+// These are transforms that can be configured using Beam Schema-compatible 
parameters.
+// Runners can override these transforms with a native implementation.
+
+syntax = "proto3";
+
+package org.apache.beam.model.pipeline.v1;
+
+option go_package = 
"github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1;pipeline_v1";
+option java_package = "org.apache.beam.model.pipeline.v1";
+option java_outer_classname = "SchemaAwareTransforms";
+
+import "org/apache/beam/model/pipeline/v1/beam_runner_api.proto";
+import "org/apache/beam/model/pipeline/v1/schema.proto";
+
+
+message StandardSchemaAwareTransforms {
+    // Payload for all of these: SchemaAwareTransformPayload
+    enum IOs {
+        ICEBERG_READ = 0 [(beam_urn) = "beam:transform:iceberg_read:v1"];
+        ICEBERG_WRITE = 1 [(beam_urn) = "beam:transform:iceberg_write:v1"];
+    }
+    enum Managed {
+        // Payload: ManagedTransformPayload
+        MANAGED = 0 [(beam_urn) = "beam:transform:managed:v1"];
+    }
+}
+
+
+// Payload for a Schema-aware PTransform.
+// This is a transform that is aware of its input and output PCollection 
schemas
+// and is configured using Beam Schema-compatible parameters.
+message SchemaAwareTransformPayload {
+    // The schema of the configuration row used to upgrade the transform
+    Schema expansion_schema = 1;
+    // The configuration used to build this transform.
+    // Must be compatible with the configuration schema, and decodable via 
beam:coder:row:v1.
+    bytes expansion_payload = 2;
+}
+
+// Payload for a Managed transform
+// This can be used by runners that wish to override an underlying transform

Review Comment:
   // The information available in the payload can be used by runners to 
override the `ManagedSchemaTransform `.



##########
.github/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml:
##########
@@ -111,4 +111,4 @@ jobs:
       - name: run Cross-Language Wrapper Validation script
         uses: ./.github/actions/gradle-command-self-hosted-action
         with:
-          gradle-command: 
:sdks:python:test-suites:direct:crossLanguageWrapperValidationPreCommit
\ No newline at end of file
+          gradle-command: 
:sdks:python:test-suites:direct:crossLanguageWrapperValidationPreCommit --info

Review Comment:
   Nit: missing newline



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/DynamicDestinations.java:
##########
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.io.iceberg;
+package org.apache.beam.sdk.io.iceberg;

Review Comment:
   Update  to use Externalizable ?



##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslation.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import static org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
+import static 
org.apache.beam.sdk.managed.ManagedSchemaTransformProvider.ManagedConfig;
+import static 
org.apache.beam.sdk.managed.ManagedSchemaTransformProvider.ManagedSchemaTransform;
+import static 
org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator;
+
+import com.google.auto.service.AutoService;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.model.pipeline.v1.SchemaApi;
+import 
org.apache.beam.model.pipeline.v1.SchemaAwareTransforms.ManagedTransformPayload;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaTranslation;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.construction.SdkComponents;
+import 
org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class ManagedSchemaTransformTranslation {
+  static class ManagedSchemaTransformTranslator
+      implements TransformPayloadTranslator<ManagedSchemaTransform> {
+    private final ManagedSchemaTransformProvider provider;
+    static final Schema SCHEMA =
+        Schema.builder()
+            .addStringField("transform_identifier")
+            .addNullableStringField("config")
+            .addNullableStringField("config_url")
+            .build();
+
+    public ManagedSchemaTransformTranslator() {
+      provider = new ManagedSchemaTransformProvider(null);
+    }
+
+    @Override
+    public String getUrn() {
+      return provider.identifier();
+    }
+
+    @Override
+    @SuppressWarnings("argument")
+    public @Nullable FunctionSpec translate(
+        AppliedPTransform<?, ?, ManagedSchemaTransform> application, 
SdkComponents components)
+        throws IOException {
+      SchemaApi.Schema expansionSchema = 
SchemaTranslation.schemaToProto(SCHEMA, true);
+      ManagedConfig managedConfig = 
application.getTransform().getManagedConfig();
+      Row configRow = toConfigRow(application.getTransform());
+      ByteArrayOutputStream os = new ByteArrayOutputStream();
+      RowCoder.of(SCHEMA).encode(configRow, os);
+
+      return FunctionSpec.newBuilder()
+          .setUrn(getUrn())
+          .setPayload(
+              ManagedTransformPayload.newBuilder()
+                  
.setUnderlyingTransformUrn(managedConfig.getTransformIdentifier())

Review Comment:
   We want this to be the URN of the underlying transform not the 
schema-transform identifier. Maintain a mapping somewhere and add unit test for 
consistency ?



##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -108,8 +107,8 @@ public static ManagedTransform read(String source) {
   }
 
   /**
-   * Instantiates a {@link Managed.Write} transform for the specified sink. 
The supported managed
-   * sinks are:
+   * Instantiates a {@link Managed.ManagedTransform} transform for the 
specified sink. The supported

Review Comment:
   We updated `ManagedTransform` to directly implement `PTransform` to avoid 
confusion.



##########
model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema_aware_transforms.proto:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+// Protocol Buffers describing available Schema-Aware transforms.
+// These are transforms that can be configured using Beam Schema-compatible 
parameters.
+// Runners can override these transforms with a native implementation.
+
+syntax = "proto3";
+
+package org.apache.beam.model.pipeline.v1;
+
+option go_package = 
"github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1;pipeline_v1";
+option java_package = "org.apache.beam.model.pipeline.v1";
+option java_outer_classname = "SchemaAwareTransforms";
+
+import "org/apache/beam/model/pipeline/v1/beam_runner_api.proto";
+import "org/apache/beam/model/pipeline/v1/schema.proto";
+
+
+message StandardSchemaAwareTransforms {
+    // Payload for all of these: SchemaAwareTransformPayload
+    enum IOs {
+        ICEBERG_READ = 0 [(beam_urn) = "beam:transform:iceberg_read:v1"];
+        ICEBERG_WRITE = 1 [(beam_urn) = "beam:transform:iceberg_write:v1"];
+    }
+    enum Managed {
+        // Payload: ManagedTransformPayload
+        MANAGED = 0 [(beam_urn) = "beam:transform:managed:v1"];
+    }
+}
+
+
+// Payload for a Schema-aware PTransform.
+// This is a transform that is aware of its input and output PCollection 
schemas
+// and is configured using Beam Schema-compatible parameters.
+message SchemaAwareTransformPayload {
+    // The schema of the configuration row used to upgrade the transform
+    Schema expansion_schema = 1;
+    // The configuration used to build this transform.

Review Comment:
   nit: s/build/upgrade



##########
model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema_aware_transforms.proto:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+// Protocol Buffers describing available Schema-Aware transforms.
+// These are transforms that can be configured using Beam Schema-compatible 
parameters.
+// Runners can override these transforms with a native implementation.
+
+syntax = "proto3";
+
+package org.apache.beam.model.pipeline.v1;
+
+option go_package = 
"github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1;pipeline_v1";
+option java_package = "org.apache.beam.model.pipeline.v1";
+option java_outer_classname = "SchemaAwareTransforms";
+
+import "org/apache/beam/model/pipeline/v1/beam_runner_api.proto";
+import "org/apache/beam/model/pipeline/v1/schema.proto";
+
+
+message StandardSchemaAwareTransforms {
+    // Payload for all of these: SchemaAwareTransformPayload
+    enum IOs {
+        ICEBERG_READ = 0 [(beam_urn) = "beam:transform:iceberg_read:v1"];
+        ICEBERG_WRITE = 1 [(beam_urn) = "beam:transform:iceberg_write:v1"];
+    }
+    enum Managed {
+        // Payload: ManagedTransformPayload
+        MANAGED = 0 [(beam_urn) = "beam:transform:managed:v1"];
+    }
+}
+
+
+// Payload for a Schema-aware PTransform.
+// This is a transform that is aware of its input and output PCollection 
schemas
+// and is configured using Beam Schema-compatible parameters.
+message SchemaAwareTransformPayload {
+    // The schema of the configuration row used to upgrade the transform
+    Schema expansion_schema = 1;
+    // The configuration used to build this transform.
+    // Must be compatible with the configuration schema, and decodable via 
beam:coder:row:v1.
+    bytes expansion_payload = 2;
+}
+
+// Payload for a Managed transform

Review Comment:
   for a `ManagedSchemaTransform`.



##########
model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema_aware_transforms.proto:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+// Protocol Buffers describing available Schema-Aware transforms.
+// These are transforms that can be configured using Beam Schema-compatible 
parameters.
+// Runners can override these transforms with a native implementation.
+
+syntax = "proto3";
+
+package org.apache.beam.model.pipeline.v1;
+
+option go_package = 
"github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1;pipeline_v1";
+option java_package = "org.apache.beam.model.pipeline.v1";
+option java_outer_classname = "SchemaAwareTransforms";
+
+import "org/apache/beam/model/pipeline/v1/beam_runner_api.proto";
+import "org/apache/beam/model/pipeline/v1/schema.proto";
+
+
+message StandardSchemaAwareTransforms {
+    // Payload for all of these: SchemaAwareTransformPayload
+    enum IOs {
+        ICEBERG_READ = 0 [(beam_urn) = "beam:transform:iceberg_read:v1"];
+        ICEBERG_WRITE = 1 [(beam_urn) = "beam:transform:iceberg_write:v1"];
+    }
+    enum Managed {
+        // Payload: ManagedTransformPayload
+        MANAGED = 0 [(beam_urn) = "beam:transform:managed:v1"];
+    }
+}
+
+
+// Payload for a Schema-aware PTransform.
+// This is a transform that is aware of its input and output PCollection 
schemas
+// and is configured using Beam Schema-compatible parameters.
+message SchemaAwareTransformPayload {
+    // The schema of the configuration row used to upgrade the transform
+    Schema expansion_schema = 1;
+    // The configuration used to build this transform.
+    // Must be compatible with the configuration schema, and decodable via 
beam:coder:row:v1.
+    bytes expansion_payload = 2;
+}
+
+// Payload for a Managed transform
+// This can be used by runners that wish to override an underlying transform
+// with a different implementation.
+message ManagedTransformPayload {
+    // The underlying transform's URN.
+    string underlying_transform_urn = 1;
+    // The managed transform configuration Schema.
+    Schema expansion_schema = 2;
+    // The configuration used to build the managed transform.

Review Comment:
   s/build/override



##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java:
##########
@@ -51,14 +54,14 @@ public class ManagedSchemaTransformProvider
 
   @Override
   public String identifier() {
-    return "beam:schematransform:org.apache.beam:managed:v1";
+    return "beam:transform:managed:v1";

Review Comment:
   Change to "beam:schematransform:org.apache.beam:managed:v1" to conform to 
the convention ?



##########
model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema_aware_transforms.proto:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+// Protocol Buffers describing available Schema-Aware transforms.
+// These are transforms that can be configured using Beam Schema-compatible 
parameters.
+// Runners can override these transforms with a native implementation.
+
+syntax = "proto3";
+
+package org.apache.beam.model.pipeline.v1;
+
+option go_package = 
"github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1;pipeline_v1";
+option java_package = "org.apache.beam.model.pipeline.v1";
+option java_outer_classname = "SchemaAwareTransforms";
+
+import "org/apache/beam/model/pipeline/v1/beam_runner_api.proto";
+import "org/apache/beam/model/pipeline/v1/schema.proto";
+
+
+message StandardSchemaAwareTransforms {
+    // Payload for all of these: SchemaAwareTransformPayload
+    enum IOs {
+        ICEBERG_READ = 0 [(beam_urn) = "beam:transform:iceberg_read:v1"];
+        ICEBERG_WRITE = 1 [(beam_urn) = "beam:transform:iceberg_write:v1"];
+    }
+    enum Managed {
+        // Payload: ManagedTransformPayload
+        MANAGED = 0 [(beam_urn) = "beam:transform:managed:v1"];
+    }
+}
+
+
+// Payload for a Schema-aware PTransform.
+// This is a transform that is aware of its input and output PCollection 
schemas
+// and is configured using Beam Schema-compatible parameters.

Review Comment:
   Add:
   
   // The information available in the payload can be used by runners to 
override the corresponding schema-aware transform .



##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java:
##########
@@ -67,19 +70,21 @@ public ManagedSchemaTransformProvider() {}
               "Found multiple SchemaTransformProvider implementations with the 
same identifier "
                   + schemaTransformProvider.identifier());
         }
-        schemaTransformProviders.put(schemaTransformProvider.identifier(), 
schemaTransformProvider);
+        if (supportedIdentifiers == null
+            || 
supportedIdentifiers.contains(schemaTransformProvider.identifier())) {
+          schemaTransformProviders.put(
+              schemaTransformProvider.identifier(), schemaTransformProvider);
+        }
       }
     } catch (Exception e) {
       throw new RuntimeException(e.getMessage());
     }
-
-    schemaTransformProviders.entrySet().removeIf(e -> 
!supportedIdentifiers.contains(e.getKey()));
   }
 
   @DefaultSchema(AutoValueSchema.class)
   @AutoValue
   @VisibleForTesting
-  abstract static class ManagedConfig {
+  public abstract static class ManagedConfig {

Review Comment:
   Can this and other classes defined in this file be package private ?



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java:
##########
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.io.iceberg;

Review Comment:
   Update to use Externalizable ?



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIOTranslation.java:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static org.apache.beam.sdk.io.iceberg.IcebergIO.ReadRows;
+import static org.apache.beam.sdk.io.iceberg.IcebergIO.WriteRows;
+import static 
org.apache.beam.sdk.util.construction.TransformUpgrader.fromByteArray;
+import static 
org.apache.beam.sdk.util.construction.TransformUpgrader.toByteArray;
+
+import com.google.auto.service.AutoService;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InvalidClassException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.model.pipeline.v1.SchemaApi;
+import 
org.apache.beam.model.pipeline.v1.SchemaAwareTransforms.SchemaAwareTransformPayload;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaTranslation;
+import org.apache.beam.sdk.transforms.PTransform;
+import 
org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator;
+import org.apache.beam.sdk.util.construction.SdkComponents;
+import 
org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@SuppressWarnings({"rawtypes", "nullness"})
+public class IcebergIOTranslation {
+  static class IcebergIOReadTranslator implements 
TransformPayloadTranslator<ReadRows> {
+
+    static final Schema READ_SCHEMA =
+        Schema.builder()
+            .addByteArrayField("catalog_config")
+            .addNullableStringField("table_identifier")
+            .build();
+
+    public static final String ICEBERG_READ_TRANSFORM_URN =
+        "beam:transform:org.apache.beam:iceberg_read:v1";
+
+    @Override
+    public String getUrn() {
+      return ICEBERG_READ_TRANSFORM_URN;
+    }
+
+    @Override
+    public @Nullable FunctionSpec translate(
+        AppliedPTransform<?, ?, ReadRows> application, SdkComponents 
components)
+        throws IOException {
+      SchemaApi.Schema expansionSchema = 
SchemaTranslation.schemaToProto(READ_SCHEMA, true);
+      Row configRow = toConfigRow(application.getTransform());
+      ByteArrayOutputStream os = new ByteArrayOutputStream();
+      RowCoder.of(READ_SCHEMA).encode(configRow, os);
+
+      return FunctionSpec.newBuilder()
+          .setUrn(getUrn())
+          .setPayload(
+              SchemaAwareTransformPayload.newBuilder()
+                  .setExpansionSchema(expansionSchema)
+                  .setExpansionPayload(ByteString.copyFrom(os.toByteArray()))
+                  .build()
+                  .toByteString())
+          .build();
+    }
+
+    @Override
+    public Row toConfigRow(ReadRows transform) {
+
+      Map<String, Object> fieldValues = new HashMap<>();
+
+      if (transform.getCatalogConfig() != null) {
+        fieldValues.put("catalog_config", 
toByteArray(transform.getCatalogConfig()));
+      }
+      if (transform.getTableIdentifier() != null) {
+        TableIdentifier identifier = transform.getTableIdentifier();
+        List<String> identifierParts =
+            
Arrays.stream(identifier.namespace().levels()).collect(Collectors.toList());
+        identifierParts.add(identifier.name());
+        fieldValues.put("table_identifier", String.join(".", identifierParts));
+      }
+
+      return Row.withSchema(READ_SCHEMA).withFieldValues(fieldValues).build();
+    }
+
+    @Override
+    public ReadRows fromConfigRow(Row configRow, PipelineOptions options) {
+      try {
+        ReadRows.Builder builder = new AutoValue_IcebergIO_ReadRows.Builder();
+
+        byte[] catalogBytes = configRow.getBytes("catalog_config");
+        if (catalogBytes != null) {
+          builder = builder.setCatalogConfig((IcebergCatalogConfig) 
fromByteArray(catalogBytes));
+        }
+        String tableIdentifier = configRow.getString("table_identifier");
+        if (tableIdentifier != null) {
+          builder = 
builder.setTableIdentifier(TableIdentifier.parse(tableIdentifier));
+        }
+        return builder.build();
+      } catch (InvalidClassException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @AutoService(TransformPayloadTranslatorRegistrar.class)
+  public static class ReadRegistrar implements 
TransformPayloadTranslatorRegistrar {
+    @Override
+    @SuppressWarnings({
+      "rawtypes",
+    })
+    public Map<? extends Class<? extends PTransform>, ? extends 
TransformPayloadTranslator>
+        getTransformPayloadTranslators() {
+      return ImmutableMap.<Class<? extends PTransform>, 
TransformPayloadTranslator>builder()
+          .put(AutoValue_IcebergIO_ReadRows.class, new 
IcebergIOReadTranslator())
+          .build();
+    }
+  }
+
+  static class IcebergIOWriteTranslator implements 
TransformPayloadTranslator<WriteRows> {
+
+    static final Schema WRITE_SCHEMA =
+        Schema.builder()
+            .addByteArrayField("catalog_config")
+            .addNullableStringField("table_identifier")
+            .addNullableByteArrayField("dynamic_destinations")
+            .build();
+
+    public static final String ICEBERG_WRITE_TRANSFORM_URN =
+        "beam:transform:org.apache.beam:iceberg_write:v1";
+
+    @Override
+    public String getUrn() {
+      return ICEBERG_WRITE_TRANSFORM_URN;
+    }
+
+    @Override
+    public @Nullable FunctionSpec translate(
+        AppliedPTransform<?, ?, WriteRows> application, SdkComponents 
components)
+        throws IOException {
+      SchemaApi.Schema expansionSchema = 
SchemaTranslation.schemaToProto(WRITE_SCHEMA, true);
+      Row configRow = toConfigRow(application.getTransform());
+      ByteArrayOutputStream os = new ByteArrayOutputStream();
+      RowCoder.of(WRITE_SCHEMA).encode(configRow, os);
+
+      return FunctionSpec.newBuilder()
+          .setUrn(getUrn())
+          .setPayload(
+              SchemaAwareTransformPayload.newBuilder()
+                  .setExpansionSchema(expansionSchema)
+                  .setExpansionPayload(ByteString.copyFrom(os.toByteArray()))
+                  .build()
+                  .toByteString())
+          .build();
+    }
+
+    @Override
+    public Row toConfigRow(WriteRows transform) {
+
+      Map<String, Object> fieldValues = new HashMap<>();
+
+      if (transform.getCatalogConfig() != null) {
+        fieldValues.put("catalog_config", 
toByteArray(transform.getCatalogConfig()));
+      }
+      if (transform.getTableIdentifier() != null) {
+        TableIdentifier identifier = transform.getTableIdentifier();
+        List<String> identifierParts =
+            
Arrays.stream(identifier.namespace().levels()).collect(Collectors.toList());
+        identifierParts.add(identifier.name());
+        fieldValues.put("table_identifier", String.join(".", identifierParts));
+      }
+      if (transform.getDynamicDestinations() != null) {
+        fieldValues.put("dynamic_destinations", 
toByteArray(transform.getDynamicDestinations()));

Review Comment:
   As discussed offline, we should consider implementing `Externalizable` for 
the complex objects serialized here (`CatalogConfig` and `DynamicDestinations`).



##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java:
##########
@@ -87,15 +88,14 @@ public class Managed {
           .build();
 
   /**
-   * Instantiates a {@link Managed.Read} transform for the specified source. 
The supported managed
-   * sources are:
+   * Instantiates a {@link Managed.ManagedTransform} transform for the 
specified source. The

Review Comment:
   Let's move schema-transforms identifiers for specific I/Os to constants and 
refer to them from both here and identifier() methods of corresponding 
`SchemaTransformProvider` implementations for consistency.



##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java:
##########
@@ -122,29 +145,28 @@ protected SchemaTransform from(ManagedConfig 
managedConfig) {
             "Could not find transform with identifier %s, or it may not be 
supported",
             managedConfig.getTransformIdentifier());
 
-    // parse config before expansion to check if it matches underlying 
transform's config schema
-    Schema transformConfigSchema = 
schemaTransformProvider.configurationSchema();
-    Row transformConfig;
-    try {
-      transformConfig = getRowConfig(managedConfig, transformConfigSchema);
-    } catch (Exception e) {
-      throw new IllegalArgumentException(
-          String.format(
-              "Specified configuration does not align with the underlying 
transform's configuration schema [%s].",
-              transformConfigSchema),
-          e);
-    }
-
-    return new ManagedSchemaTransform(transformConfig, 
schemaTransformProvider);
+    return new ManagedSchemaTransform(managedConfig, schemaTransformProvider);
   }
 
-  private static class ManagedSchemaTransform extends SchemaTransform {
+  public static class ManagedSchemaTransform extends SchemaTransform {

Review Comment:
   Ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to