robertwb commented on code in PR #30910:
URL: https://github.com/apache/beam/pull/30910#discussion_r1571531368


##########
model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema_aware_transforms.proto:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+// Protocol Buffers describing available Schema-Aware transforms.
+// These are transforms that can be configured using Beam Schema-compatible 
parameters.
+// Runners can override these transforms with a native implementation.
+
+syntax = "proto3";
+
+package org.apache.beam.model.pipeline.v1;
+
+option go_package = 
"github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1;pipeline_v1";
+option java_package = "org.apache.beam.model.pipeline.v1";
+option java_outer_classname = "SchemaAwareTransforms";
+
+import "org/apache/beam/model/pipeline/v1/beam_runner_api.proto";
+import "org/apache/beam/model/pipeline/v1/schema.proto";
+
+
+message StandardSchemaAwareTransforms {
+    // Payload for all of these: SchemaAwareTransformPayload
+    enum IOs {
+        ICEBERG_READ = 0 [(beam_urn) = 
"beam:schematransform:org.apache.beam:iceberg_read:v1"];
+        ICEBERG_WRITE = 1 [(beam_urn) = 
"beam:schematransform:org.apache.beam:iceberg_write:v1"];
+    }
+    enum Managed {
+        // Payload: ManagedSchemaTransformPayload
+        MANAGED = 0 [(beam_urn) = "beam:transform:managed:v1"];
+    }
+}
+
+
+// Payload for a Schema-aware PTransform.
+// This is a transform that is aware of its input and output PCollection 
schemas
+// and is configured using Beam Schema-compatible parameters.
+// The information available in the payload can be used by runners to override 
the schema-aware transform.
+message SchemaAwareTransformPayload {
+    // The schema of the configuration row used to override the transform
+    Schema expansion_schema = 1;
+    // The configuration used to override this transform.
+    // Must be compatible with the configuration schema, and decodable via 
beam:coder:row:v1.
+    bytes expansion_payload = 2;
+}
+
+// Payload for a ManagedSchemaTransform.
+// The information available in the payload can be used by runners to override 
the ManagedSchemaTransform.
+message ManagedSchemaTransformPayload {

Review Comment:
   This is almost entirely identical to 
https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto#L114



##########
model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema_aware_transforms.proto:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+// Protocol Buffers describing available Schema-Aware transforms.
+// These are transforms that can be configured using Beam Schema-compatible 
parameters.
+// Runners can override these transforms with a native implementation.
+
+syntax = "proto3";
+
+package org.apache.beam.model.pipeline.v1;
+
+option go_package = 
"github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1;pipeline_v1";
+option java_package = "org.apache.beam.model.pipeline.v1";
+option java_outer_classname = "SchemaAwareTransforms";
+
+import "org/apache/beam/model/pipeline/v1/beam_runner_api.proto";
+import "org/apache/beam/model/pipeline/v1/schema.proto";
+
+
+message StandardSchemaAwareTransforms {
+    // Payload for all of these: SchemaAwareTransformPayload
+    enum IOs {
+        ICEBERG_READ = 0 [(beam_urn) = 
"beam:schematransform:org.apache.beam:iceberg_read:v1"];
+        ICEBERG_WRITE = 1 [(beam_urn) = 
"beam:schematransform:org.apache.beam:iceberg_write:v1"];
+    }
+    enum Managed {
+        // Payload: ManagedSchemaTransformPayload
+        MANAGED = 0 [(beam_urn) = "beam:transform:managed:v1"];
+    }
+}
+
+
+// Payload for a Schema-aware PTransform.

Review Comment:
   This seems highly redundant with 
https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto#L114



##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslation.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import static org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
+import static 
org.apache.beam.sdk.managed.ManagedSchemaTransformProvider.ManagedConfig;
+import static 
org.apache.beam.sdk.managed.ManagedSchemaTransformProvider.ManagedSchemaTransform;
+import static 
org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator;
+
+import com.google.auto.service.AutoService;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.model.pipeline.v1.SchemaApi;
+import 
org.apache.beam.model.pipeline.v1.SchemaAwareTransforms.ManagedSchemaTransformPayload;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaTranslation;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.construction.SdkComponents;
+import 
org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class ManagedSchemaTransformTranslation {
+  static class ManagedSchemaTransformTranslator
+      implements TransformPayloadTranslator<ManagedSchemaTransform> {
+    private final ManagedSchemaTransformProvider provider;
+    static final Schema SCHEMA =
+        Schema.builder()
+            .addStringField("transform_identifier")
+            .addNullableStringField("config")
+            .addNullableStringField("config_url")
+            .build();
+
+    public ManagedSchemaTransformTranslator() {
+      provider = new ManagedSchemaTransformProvider(null);
+    }
+
+    @Override
+    public String getUrn() {
+      return provider.identifier();
+    }
+
+    @Override
+    @SuppressWarnings("argument")
+    public @Nullable FunctionSpec translate(
+        AppliedPTransform<?, ?, ManagedSchemaTransform> application, 
SdkComponents components)
+        throws IOException {
+      SchemaApi.Schema expansionSchema = 
SchemaTranslation.schemaToProto(SCHEMA, true);
+      ManagedConfig managedConfig = 
application.getTransform().getManagedConfig();
+      Row configRow = toConfigRow(application.getTransform());
+      ByteArrayOutputStream os = new ByteArrayOutputStream();
+      RowCoder.of(SCHEMA).encode(configRow, os);
+
+      return FunctionSpec.newBuilder()
+          .setUrn(getUrn())
+          .setPayload(
+              ManagedSchemaTransformPayload.newBuilder()
+                  
.setUnderlyingTransformIdentifier(managedConfig.getTransformIdentifier())
+                  .setYamlConfig(managedConfig.resolveUnderlyingConfig())
+                  .setExpansionSchema(expansionSchema)
+                  .setExpansionPayload(ByteString.copyFrom(os.toByteArray()))
+                  .build()
+                  .toByteString())
+          .build();
+    }
+
+    @Override
+    public Row toConfigRow(ManagedSchemaTransform transform) {

Review Comment:
   ManagedConfig is already @DefaultSchema; its toRow (and, below fromRow) 
methods should be used rather than needing to be manually implemented. (This is 
how it works as a SchemaTransform.)



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteResult.java:
##########
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.io.iceberg;
+package org.apache.beam.sdk.io.iceberg;

Review Comment:
   What is the PCoder used for the `PCollection<KV<String, Snapshot>>` here? 
I'm concerned there may be update compatibility issues with it. 



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.util.Collections;
+import java.util.List;
+import 
org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.Config;
+import org.apache.beam.sdk.managed.ManagedTransformConstants;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.catalog.TableIdentifier;
+
+/**
+ * SchemaTransform implementation for {@link IcebergIO#writeRows}. Writes Beam 
Rows to Iceberg and
+ * outputs a {@code PCollection<Row>} representing snapshots created in the 
process.
+ */
+@AutoService(SchemaTransformProvider.class)
+public class IcebergWriteSchemaTransformProvider extends 
TypedSchemaTransformProvider<Config> {
+
+  static final String INPUT_TAG = "input";
+  static final String OUTPUT_TAG = "output";
+
+  static final Schema OUTPUT_SCHEMA =
+      Schema.builder()
+          .addStringField("table")
+          .addStringField("operation")
+          .addMapField("summary", Schema.FieldType.STRING, 
Schema.FieldType.STRING)
+          .addStringField("manifestListLocation")
+          .build();
+
+  @Override
+  public String description() {
+    return "Writes Beam Rows to Iceberg.\n"
+        + "Returns a PCollection representing the snapshots produced in the 
process, with the following schema:\n"
+        + "{\"table\" (str), \"operation\" (str), \"summary\" (map[str, str]), 
\"manifestListLocation\" (str)}";
+  }
+
+  @Override
+  protected SchemaTransform from(Config configuration) {
+    configuration.validate();
+    return new IcebergWriteSchemaTransform(configuration, 
configurationSchema());
+  }
+
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT_TAG);
+  }
+
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  @Override
+  public String identifier() {
+    return ManagedTransformConstants.ICEBERG_WRITE;
+  }
+
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class Config {
+    public static Builder builder() {
+      return new 
AutoValue_IcebergWriteSchemaTransformProvider_Config.Builder();
+    }
+
+    @SchemaFieldDescription("Identifier of the Iceberg table to write to.")
+    public abstract String getTable();
+
+    @SchemaFieldDescription("Configuration parameters used to set up the 
Iceberg catalog.")
+    public abstract IcebergSchemaTransformCatalogConfig getCatalogConfig();
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+      public abstract Builder setTable(String tables);
+
+      public abstract Builder 
setCatalogConfig(IcebergSchemaTransformCatalogConfig catalogConfig);
+
+      public abstract Config build();
+    }
+
+    public void validate() {
+      getCatalogConfig().validate();
+    }
+  }
+
+  static class IcebergWriteSchemaTransform extends SchemaTransform {
+    private final Config configuration;
+    private final Row configurationRow;
+
+    IcebergWriteSchemaTransform(Config configuration, Schema configSchema) {
+      this.configuration = configuration;
+
+      configurationRow =
+          Row.withSchema(configSchema)
+              .withFieldValue("table", configuration.getTable())
+              .withFieldValue("catalogConfig", 
configuration.getCatalogConfig().toRow())
+              .build();
+    }
+
+    Row getConfigurationRow() {
+      return configurationRow;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+
+      PCollection<Row> rows = input.get(INPUT_TAG);
+
+      IcebergSchemaTransformCatalogConfig catalogConfig = 
configuration.getCatalogConfig();
+
+      IcebergCatalogConfig.Builder catalogBuilder =
+          
IcebergCatalogConfig.builder().setName(catalogConfig.getCatalogName());
+
+      if (!Strings.isNullOrEmpty(catalogConfig.getCatalogType())) {
+        catalogBuilder = 
catalogBuilder.setIcebergCatalogType(catalogConfig.getCatalogType());
+      }
+      if (!Strings.isNullOrEmpty(catalogConfig.getWarehouseLocation())) {
+        catalogBuilder = 
catalogBuilder.setWarehouseLocation(catalogConfig.getWarehouseLocation());
+      }
+
+      // TODO: support dynamic destinations
+      DynamicDestinations dynamicDestinations =
+          
DynamicDestinations.singleTable(TableIdentifier.parse(configuration.getTable()));
+
+      IcebergWriteResult result =
+          
rows.apply(IcebergIO.writeRows(catalogBuilder.build()).to(dynamicDestinations));
+
+      PCollection<Row> snapshots =
+          result
+              .getSnapshots()
+              .apply(MapElements.via(new SnapshotToRow()))

Review Comment:
   FYI, it may not be enough to just map to Rows, as all the intermediate data 
choices will be sticky for pipeline update. (We may be stuck with this for some 
existing sources, but we should think long and hard about what this means for 
anything new.)



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java:
##########
@@ -53,19 +53,19 @@ public class YamlUtils {
           .put(Schema.TypeName.BYTES, str -> BaseEncoding.base64().decode(str))
           .build();
 
-  public static Row toBeamRow(@Nullable String yamlString, Schema schema) {
+  public static @Nullable Row toBeamRow(@Nullable String yamlString, Schema 
schema) {
     return toBeamRow(yamlString, schema, false);
   }
 
-  public static Row toBeamRow(
+  public static @Nullable Row toBeamRow(
       @Nullable String yamlString, Schema schema, boolean 
convertNamesToCamelCase) {
     if (yamlString == null || yamlString.isEmpty()) {
       List<Field> requiredFields =
           schema.getFields().stream()
               .filter(field -> !field.getType().getNullable())
               .collect(Collectors.toList());
       if (requiredFields.isEmpty()) {
-        return Row.nullRow(schema);
+        return null;

Review Comment:
   Why this change? It seems like non-nullable values are preferable. 
(Similarly below.)



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java:
##########
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.io.iceberg;
+package org.apache.beam.sdk.io.iceberg;
 

Review Comment:
   This class looks like it's used for actual data which may cause backwards 
compatibility headaches in the future. Rather than a custom 
FileWriteResult.FileWriteResultCoder.class, could we declare it as a AutoSchema 
and use the SchemaCoder? 



##########
model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema_aware_transforms.proto:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+// Protocol Buffers describing available Schema-Aware transforms.
+// These are transforms that can be configured using Beam Schema-compatible 
parameters.
+// Runners can override these transforms with a native implementation.
+
+syntax = "proto3";
+
+package org.apache.beam.model.pipeline.v1;
+
+option go_package = 
"github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1;pipeline_v1";
+option java_package = "org.apache.beam.model.pipeline.v1";
+option java_outer_classname = "SchemaAwareTransforms";
+
+import "org/apache/beam/model/pipeline/v1/beam_runner_api.proto";
+import "org/apache/beam/model/pipeline/v1/schema.proto";
+
+
+message StandardSchemaAwareTransforms {
+    // Payload for all of these: SchemaAwareTransformPayload
+    enum IOs {
+        ICEBERG_READ = 0 [(beam_urn) = 
"beam:schematransform:org.apache.beam:iceberg_read:v1"];
+        ICEBERG_WRITE = 1 [(beam_urn) = 
"beam:schematransform:org.apache.beam:iceberg_write:v1"];
+    }
+    enum Managed {
+        // Payload: ManagedSchemaTransformPayload
+        MANAGED = 0 [(beam_urn) = "beam:transform:managed:v1"];
+    }
+}
+
+
+// Payload for a Schema-aware PTransform.
+// This is a transform that is aware of its input and output PCollection 
schemas
+// and is configured using Beam Schema-compatible parameters.
+// The information available in the payload can be used by runners to override 
the schema-aware transform.
+message SchemaAwareTransformPayload {
+    // The schema of the configuration row used to override the transform
+    Schema expansion_schema = 1;
+    // The configuration used to override this transform.
+    // Must be compatible with the configuration schema, and decodable via 
beam:coder:row:v1.
+    bytes expansion_payload = 2;
+}
+
+// Payload for a ManagedSchemaTransform.
+// The information available in the payload can be used by runners to override 
the ManagedSchemaTransform.
+message ManagedSchemaTransformPayload {
+    // The underlying schema-aware transform's identifier.
+    string underlying_transform_identifier = 1;
+    // The managed transform's configuration Schema.
+    Schema expansion_schema = 2;
+    // The configuration used to override the managed transform.
+    // Must be compatible with the expansion schema, and decodable via 
beam:coder:row:v1.
+    bytes expansion_payload = 3;
+    // The underlying transform's configuration, represented as a YAML string.
+    string yaml_config = 4;

Review Comment:
   Are we not supporting a config url as well as an inlined config?



##########
model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema_aware_transforms.proto:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+// Protocol Buffers describing available Schema-Aware transforms.
+// These are transforms that can be configured using Beam Schema-compatible 
parameters.
+// Runners can override these transforms with a native implementation.
+
+syntax = "proto3";
+
+package org.apache.beam.model.pipeline.v1;
+
+option go_package = 
"github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1;pipeline_v1";
+option java_package = "org.apache.beam.model.pipeline.v1";
+option java_outer_classname = "SchemaAwareTransforms";
+
+import "org/apache/beam/model/pipeline/v1/beam_runner_api.proto";
+import "org/apache/beam/model/pipeline/v1/schema.proto";
+
+
+message StandardSchemaAwareTransforms {
+    // Payload for all of these: SchemaAwareTransformPayload
+    enum IOs {
+        ICEBERG_READ = 0 [(beam_urn) = 
"beam:schematransform:org.apache.beam:iceberg_read:v1"];
+        ICEBERG_WRITE = 1 [(beam_urn) = 
"beam:schematransform:org.apache.beam:iceberg_write:v1"];
+    }
+    enum Managed {
+        // Payload: ManagedSchemaTransformPayload
+        MANAGED = 0 [(beam_urn) = "beam:transform:managed:v1"];
+    }
+}
+
+
+// Payload for a Schema-aware PTransform.
+// This is a transform that is aware of its input and output PCollection 
schemas
+// and is configured using Beam Schema-compatible parameters.
+// The information available in the payload can be used by runners to override 
the schema-aware transform.
+message SchemaAwareTransformPayload {
+    // The schema of the configuration row used to override the transform
+    Schema expansion_schema = 1;
+    // The configuration used to override this transform.
+    // Must be compatible with the configuration schema, and decodable via 
beam:coder:row:v1.
+    bytes expansion_payload = 2;
+}
+
+// Payload for a ManagedSchemaTransform.
+// The information available in the payload can be used by runners to override 
the ManagedSchemaTransform.
+message ManagedSchemaTransformPayload {
+    // The underlying schema-aware transform's identifier.
+    string underlying_transform_identifier = 1;
+    // The managed transform's configuration Schema.
+    Schema expansion_schema = 2;
+    // The configuration used to override the managed transform.
+    // Must be compatible with the expansion schema, and decodable via 
beam:coder:row:v1.
+    bytes expansion_payload = 3;
+    // The underlying transform's configuration, represented as a YAML string.
+    string yaml_config = 4;

Review Comment:
   It seems we should provide either the yaml or the schema + encoded row, not 
both. (They'll both have the same information, right?) I think we were leaning 
towards just the yaml. 



##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslation.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.managed;
+
+import static org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
+import static 
org.apache.beam.sdk.managed.ManagedSchemaTransformProvider.ManagedConfig;
+import static 
org.apache.beam.sdk.managed.ManagedSchemaTransformProvider.ManagedSchemaTransform;
+import static 
org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator;
+
+import com.google.auto.service.AutoService;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.model.pipeline.v1.SchemaApi;
+import 
org.apache.beam.model.pipeline.v1.SchemaAwareTransforms.ManagedSchemaTransformPayload;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaTranslation;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.construction.SdkComponents;
+import 
org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class ManagedSchemaTransformTranslation {
+  static class ManagedSchemaTransformTranslator
+      implements TransformPayloadTranslator<ManagedSchemaTransform> {
+    private final ManagedSchemaTransformProvider provider;
+    static final Schema SCHEMA =
+        Schema.builder()
+            .addStringField("transform_identifier")
+            .addNullableStringField("config")
+            .addNullableStringField("config_url")
+            .build();
+
+    public ManagedSchemaTransformTranslator() {
+      provider = new ManagedSchemaTransformProvider(null);
+    }
+
+    @Override
+    public String getUrn() {
+      return provider.identifier();
+    }
+
+    @Override
+    @SuppressWarnings("argument")
+    public @Nullable FunctionSpec translate(
+        AppliedPTransform<?, ?, ManagedSchemaTransform> application, 
SdkComponents components)
+        throws IOException {
+      SchemaApi.Schema expansionSchema = 
SchemaTranslation.schemaToProto(SCHEMA, true);
+      ManagedConfig managedConfig = 
application.getTransform().getManagedConfig();
+      Row configRow = toConfigRow(application.getTransform());
+      ByteArrayOutputStream os = new ByteArrayOutputStream();
+      RowCoder.of(SCHEMA).encode(configRow, os);
+
+      return FunctionSpec.newBuilder()
+          .setUrn(getUrn())
+          .setPayload(

Review Comment:
   I don't think we need to set the payload at all, everything we need is 
already in the configRow, right?



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslation.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static 
org.apache.beam.sdk.io.iceberg.IcebergReadSchemaTransformProvider.IcebergReadSchemaTransform;
+import static 
org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.IcebergWriteSchemaTransform;
+
+import com.google.auto.service.AutoService;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.model.pipeline.v1.SchemaApi;
+import 
org.apache.beam.model.pipeline.v1.SchemaAwareTransforms.SchemaAwareTransformPayload;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.schemas.SchemaTranslation;
+import org.apache.beam.sdk.transforms.PTransform;
+import 
org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator;
+import org.apache.beam.sdk.util.construction.SdkComponents;
+import 
org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@SuppressWarnings({"rawtypes", "nullness"})
+public class IcebergSchemaTransformTranslation {
+  static class IcebergReadSchemaTransformTranslator
+      implements TransformPayloadTranslator<IcebergReadSchemaTransform> {
+    static final IcebergReadSchemaTransformProvider READ_PROVIDER =
+        new IcebergReadSchemaTransformProvider();
+
+    @Override
+    public String getUrn() {
+      return READ_PROVIDER.identifier();
+    }
+
+    @Override
+    public @Nullable FunctionSpec translate(
+        AppliedPTransform<?, ?, IcebergReadSchemaTransform> application, 
SdkComponents components)
+        throws IOException {
+      SchemaApi.Schema expansionSchema =
+          SchemaTranslation.schemaToProto(READ_PROVIDER.configurationSchema(), 
true);
+      Row configRow = toConfigRow(application.getTransform());
+      ByteArrayOutputStream os = new ByteArrayOutputStream();
+      RowCoder.of(READ_PROVIDER.configurationSchema()).encode(configRow, os);
+
+      return FunctionSpec.newBuilder()
+          .setUrn(getUrn())
+          .setPayload(

Review Comment:
   As elsewhere, we don't need to store the config in both the payload and the 
config row annotations. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to