chamikaramj commented on code in PR #30910:
URL: https://github.com/apache/beam/pull/30910#discussion_r1573857432


##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java:
##########
@@ -92,6 +92,12 @@ private static Schema getDataFileAvroSchema(FileWriteResult 
fileWriteResult) {
     @Override
     public void encode(FileWriteResult value, OutputStream outStream)
         throws CoderException, IOException {
+      // "version" of this coder.
+      // If breaking changes are introduced (e.g. from Beam, Iceberg, Avro, 
etc..),
+      // then update this version and create a fork in decode() below for the 
new decode logic.
+      // This helps keep the pipeline update-compatible
+      outStream.write(0);

Review Comment:
   Let's make sure we have unit tests that exercise reading and writing from 
this coder including the version bit.



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslation.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static 
org.apache.beam.model.pipeline.v1.ExternalTransforms.ExpansionMethods.Enum.SCHEMA_TRANSFORM;
+import static 
org.apache.beam.sdk.io.iceberg.IcebergReadSchemaTransformProvider.IcebergReadSchemaTransform;
+import static 
org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.IcebergWriteSchemaTransform;
+
+import com.google.auto.service.AutoService;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import 
org.apache.beam.model.pipeline.v1.ExternalTransforms.SchemaTransformPayload;
+import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.model.pipeline.v1.SchemaApi;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaTranslation;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.construction.BeamUrns;
+import 
org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator;
+import org.apache.beam.sdk.util.construction.SdkComponents;
+import 
org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@SuppressWarnings({"rawtypes", "nullness"})
+public class IcebergSchemaTransformTranslation {
+  static class IcebergReadSchemaTransformTranslator
+      implements TransformPayloadTranslator<IcebergReadSchemaTransform> {
+    static final IcebergReadSchemaTransformProvider READ_PROVIDER =
+        new IcebergReadSchemaTransformProvider();
+    static final Schema READ_SCHEMA = READ_PROVIDER.configurationSchema();
+
+    @Override
+    public String getUrn() {
+      return BeamUrns.getUrn(SCHEMA_TRANSFORM);
+    }
+
+    @Override
+    public @Nullable FunctionSpec translate(
+        AppliedPTransform<?, ?, IcebergReadSchemaTransform> application, 
SdkComponents components)
+        throws IOException {
+      SchemaApi.Schema expansionSchema = 
SchemaTranslation.schemaToProto(READ_SCHEMA, true);
+      Row configRow = toConfigRow(application.getTransform());
+      ByteArrayOutputStream os = new ByteArrayOutputStream();
+      RowCoder.of(READ_SCHEMA).encode(configRow, os);
+
+      return FunctionSpec.newBuilder()

Review Comment:
   Let's also update `TransformTranslation` so that we add the SchemaTransform 
identifier as an annotation if the URN is `SCHEMA_TRANSFORM`. See below for an 
example (and you can get the identifier from the payload add the the annotation 
at the same location given below). This will allow us the identify 
schema-transforms that we want manage without parsing the payload.
   
   
https://github.com/apache/beam/blob/93aa62c2900b210ee5e26849949154f19db51f4c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java#L528



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import com.google.auto.value.AutoValue;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class IcebergIO {
+
+  public static WriteRows writeRows(IcebergCatalogConfig catalog) {
+    return new 
AutoValue_IcebergIO_WriteRows.Builder().setCatalogConfig(catalog).build();

Review Comment:
   Do we still need the new builders here given that we are directly 
initializing schema-transforms in `TransformPayloadTranslator`s ?



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslation.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static 
org.apache.beam.model.pipeline.v1.ExternalTransforms.ExpansionMethods.Enum.SCHEMA_TRANSFORM;
+import static 
org.apache.beam.sdk.io.iceberg.IcebergReadSchemaTransformProvider.IcebergReadSchemaTransform;
+import static 
org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.IcebergWriteSchemaTransform;
+
+import com.google.auto.service.AutoService;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import 
org.apache.beam.model.pipeline.v1.ExternalTransforms.SchemaTransformPayload;
+import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.model.pipeline.v1.SchemaApi;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaTranslation;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.construction.BeamUrns;
+import 
org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator;
+import org.apache.beam.sdk.util.construction.SdkComponents;
+import 
org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@SuppressWarnings({"rawtypes", "nullness"})
+public class IcebergSchemaTransformTranslation {
+  static class IcebergReadSchemaTransformTranslator
+      implements TransformPayloadTranslator<IcebergReadSchemaTransform> {
+    static final IcebergReadSchemaTransformProvider READ_PROVIDER =
+        new IcebergReadSchemaTransformProvider();
+    static final Schema READ_SCHEMA = READ_PROVIDER.configurationSchema();
+
+    @Override
+    public String getUrn() {
+      return BeamUrns.getUrn(SCHEMA_TRANSFORM);

Review Comment:
   Actually, we use the urn when registering `TransformProvider`s at the 
`ExpansionService`.
   
   
https://github.com/apache/beam/blob/93aa62c2900b210ee5e26849949154f19db51f4c/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L168
   
   Hence' this will require us to do future modifications where we keep track 
of `TransformProvider`s using the schema-transform identifier. I think this is 
OK but wanted to call this out.
   
   The other option will be go to back to using different URNs for different 
schema-aware transforms.



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslation.java:
##########
@@ -51,38 +53,47 @@ static class IcebergReadSchemaTransformTranslator
 
     @Override
     public String getUrn() {
-      return READ_PROVIDER.identifier();
+      return BeamUrns.getUrn(SCHEMA_TRANSFORM);
     }
 
     @Override
     public @Nullable FunctionSpec translate(
         AppliedPTransform<?, ?, IcebergReadSchemaTransform> application, 
SdkComponents components)
         throws IOException {
-      SchemaApi.Schema expansionSchema = 
SchemaTranslation.schemaToProto(READ_SCHEMA, true);
+      Schema snakeCaseSchema = READ_SCHEMA.toSnakeCase();
+      SchemaApi.Schema expansionSchema = 
SchemaTranslation.schemaToProto(snakeCaseSchema, true);
       Row configRow = toConfigRow(application.getTransform());
       ByteArrayOutputStream os = new ByteArrayOutputStream();
-      RowCoder.of(READ_SCHEMA).encode(configRow, os);
+      RowCoder.of(snakeCaseSchema).encode(configRow, os);
 
       return FunctionSpec.newBuilder()
           .setUrn(getUrn())
           .setPayload(
               SchemaTransformPayload.newBuilder()
+                  .setIdentifier(READ_PROVIDER.identifier())
                   .setConfigurationSchema(expansionSchema)
                   .setConfigurationRow(ByteString.copyFrom(os.toByteArray()))
-                  .setIdentifier(getUrn())
                   .build()
                   .toByteString())
           .build();
     }
 
     @Override
     public Row toConfigRow(IcebergReadSchemaTransform transform) {
-      return transform.getConfigurationRow();
+      // Will retrieve a Row with snake_case naming convention.
+      // Transform expects camelCase convention, so convert back
+      // TODO(https://github.com/apache/beam/issues/31061): Remove conversion 
when
+      // TypedSchemaTransformProvider starts generating with snake_case 
convention
+      return transform.getConfigurationRow().toSnakeCase();
     }
 
     @Override
     public IcebergReadSchemaTransform fromConfigRow(Row configRow, 
PipelineOptions options) {

Review Comment:
   Sounds good. Thanks.



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformCatalogConfig.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Set;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import org.apache.iceberg.CatalogUtil;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class IcebergSchemaTransformCatalogConfig {
+  public static Builder builder() {
+    return new AutoValue_IcebergSchemaTransformCatalogConfig.Builder();
+  }
+
+  public abstract String getCatalogName();
+
+  @SchemaFieldDescription("Valid types are: {hadoop, hive, rest}")
+  public abstract @Nullable String getCatalogType();
+
+  public abstract @Nullable String getCatalogImplementation();
+
+  public abstract @Nullable String getWarehouseLocation();
+
+  @AutoValue.Builder
+  public abstract static class Builder {
+
+    public abstract Builder setCatalogName(String catalogName);
+
+    public abstract Builder setCatalogType(String catalogType);
+
+    public abstract Builder setCatalogImplementation(String 
catalogImplementation);
+
+    public abstract Builder setWarehouseLocation(String warehouseLocation);
+
+    public abstract IcebergSchemaTransformCatalogConfig build();
+  }
+
+  public static final Schema SCHEMA;
+
+  static {
+    try {
+      SCHEMA =
+          SchemaRegistry.createDefault()
+              .getSchema(IcebergSchemaTransformCatalogConfig.class)
+              .sorted()
+              .toSnakeCase();

Review Comment:
   Add a comment on why we use `toSnakeCase()` here and other schema-transform 
schemas we define in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to