chamikaramj commented on code in PR #30910: URL: https://github.com/apache/beam/pull/30910#discussion_r1568031375
########## model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema_aware_transforms.proto: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +// Protocol Buffers describing available Schema-Aware transforms. +// These are transforms that can be configured using Beam Schema-compatible parameters. +// Runners can override these transforms with a native implementation. + +syntax = "proto3"; + +package org.apache.beam.model.pipeline.v1; + +option go_package = "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1;pipeline_v1"; +option java_package = "org.apache.beam.model.pipeline.v1"; +option java_outer_classname = "SchemaAwareTransforms"; + +import "org/apache/beam/model/pipeline/v1/beam_runner_api.proto"; +import "org/apache/beam/model/pipeline/v1/schema.proto"; + + +message StandardSchemaAwareTransforms { + // Payload for all of these: SchemaAwareTransformPayload + enum IOs { + ICEBERG_READ = 0 [(beam_urn) = "beam:transform:iceberg_read:v1"]; + ICEBERG_WRITE = 1 [(beam_urn) = "beam:transform:iceberg_write:v1"]; + } + enum Managed { + // Payload: ManagedTransformPayload + MANAGED = 0 [(beam_urn) = "beam:transform:managed:v1"]; + } +} + + +// Payload for a Schema-aware PTransform. +// This is a transform that is aware of its input and output PCollection schemas +// and is configured using Beam Schema-compatible parameters. +message SchemaAwareTransformPayload { + // The transform's configuration schema + Schema expansion_schema = 1; + // The configuration used to build this transform. + // Must be compatible with the configuration schema, and decodable via beam:coder:row:v1. + bytes expansion_payload = 2; +} + +// Payload for a Managed transform +// This can be used by runners that wish to override an underlying transform +// with a different implementation. +message ManagedTransformPayload { + // The underlying transform's URN. + string underlying_transform_urn = 1; + // The underlying transform's configuration Schema. Review Comment: This should be Managed Schema Transform's schema and payload not the underlying transform's. ########## model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema_aware_transforms.proto: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +// Protocol Buffers describing available Schema-Aware transforms. +// These are transforms that can be configured using Beam Schema-compatible parameters. +// Runners can override these transforms with a native implementation. + +syntax = "proto3"; + +package org.apache.beam.model.pipeline.v1; + +option go_package = "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1;pipeline_v1"; +option java_package = "org.apache.beam.model.pipeline.v1"; +option java_outer_classname = "SchemaAwareTransforms"; + +import "org/apache/beam/model/pipeline/v1/beam_runner_api.proto"; +import "org/apache/beam/model/pipeline/v1/schema.proto"; + + +message StandardSchemaAwareTransforms { + // Payload for all of these: SchemaAwareTransformPayload + enum IOs { + ICEBERG_READ = 0 [(beam_urn) = "beam:transform:iceberg_read:v1"]; + ICEBERG_WRITE = 1 [(beam_urn) = "beam:transform:iceberg_write:v1"]; + } + enum Managed { + // Payload: ManagedTransformPayload + MANAGED = 0 [(beam_urn) = "beam:transform:managed:v1"]; + } +} + + +// Payload for a Schema-aware PTransform. +// This is a transform that is aware of its input and output PCollection schemas +// and is configured using Beam Schema-compatible parameters. +message SchemaAwareTransformPayload { + // The transform's configuration schema Review Comment: Because the configuration schema is same as the schema of the row used for upgrading ? (what we really want here is the latter) ########## sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergWriteSchemaTransformProvider.java: ########## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import org.apache.beam.io.iceberg.IcebergWriteSchemaTransformProvider.Config; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.util.Preconditions; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.catalog.TableIdentifier; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * SchemaTransform implementation for {@link IcebergIO#writeToDynamicDestinations}. Writes Beam Rows + * to Iceberg and outputs a {@code PCollection<Row>} representing snapshots created in the process. + */ +@AutoService(SchemaTransformProvider.class) +public class IcebergWriteSchemaTransformProvider extends TypedSchemaTransformProvider<Config> { + + static final String INPUT_TAG = "input"; + static final String OUTPUT_TAG = "output"; + + static final Schema OUTPUT_SCHEMA = + Schema.builder() + .addStringField("table") + .addStringField("operation") + .addMapField("summary", Schema.FieldType.STRING, Schema.FieldType.STRING) + .addStringField("manifestListLocation") + .build(); + + @Override + public String description() { + return "Writes Beam Rows to Iceberg.\n" + + "Returns a PCollection representing the snapshots produced in the process, with the following schema:\n" + + "{\"table\" (str), \"operation\" (str), \"summary\" (map[str, str]), \"manifestListLocation\" (str)}"; + } + + @Override + protected SchemaTransform from(Config configuration) { + configuration.validate(); + return new IcebergWriteSchemaTransform(configuration); + } + + @Override + public List<String> inputCollectionNames() { + return Collections.singletonList(INPUT_TAG); + } + + @Override + public List<String> outputCollectionNames() { + return Collections.singletonList(OUTPUT_TAG); + } + + @Override + public String identifier() { + return "beam:schematransform:org.apache.beam:iceberg_write:v1"; + } + + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class Config { + public static Builder builder() { + return new AutoValue_IcebergWriteSchemaTransformProvider_Config.Builder(); + } + + public abstract String getTable(); + + public abstract CatalogConfig getCatalogConfig(); + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setTable(String tables); + + public abstract Builder setCatalogConfig(CatalogConfig catalogConfig); + + public abstract Config build(); + } + + public void validate() { + getCatalogConfig().validate(); + } + } + + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class CatalogConfig { + public static Builder builder() { + return new AutoValue_IcebergWriteSchemaTransformProvider_CatalogConfig.Builder(); + } + + public abstract String getCatalogName(); + + public abstract @Nullable String getCatalogType(); + + public abstract @Nullable String getCatalogImplementation(); + + public abstract @Nullable String getWarehouseLocation(); + + @AutoValue.Builder + public abstract static class Builder { + + public abstract Builder setCatalogName(String catalogName); + + public abstract Builder setCatalogType(String catalogType); + + public abstract Builder setCatalogImplementation(String catalogImplementation); + + public abstract Builder setWarehouseLocation(String warehouseLocation); + + public abstract CatalogConfig build(); + } + + Set<String> validTypes = + Sets.newHashSet( + CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP, + CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE, + CatalogUtil.ICEBERG_CATALOG_TYPE_REST); + + public void validate() { + if (Strings.isNullOrEmpty(getCatalogType())) { + checkArgument( + validTypes.contains(Preconditions.checkArgumentNotNull(getCatalogType())), + "Invalid catalog type. Please pick one of %s", + validTypes); + } + } + } + + @VisibleForTesting + static class IcebergWriteSchemaTransform extends SchemaTransform { + private final Config configuration; + + IcebergWriteSchemaTransform(Config configuration) { + this.configuration = configuration; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + + PCollection<Row> rows = input.get(INPUT_TAG); + + CatalogConfig catalogConfig = configuration.getCatalogConfig(); + + IcebergCatalogConfig.Builder catalogBuilder = + IcebergCatalogConfig.builder() + .setName(catalogConfig.getCatalogName()) + .setIcebergCatalogType(catalogConfig.getCatalogType()) + .setWarehouseLocation(catalogConfig.getWarehouseLocation()); + + if (!Strings.isNullOrEmpty(catalogConfig.getCatalogType())) { + catalogBuilder = catalogBuilder.setIcebergCatalogType(catalogConfig.getCatalogType()); + } + if (!Strings.isNullOrEmpty(catalogConfig.getWarehouseLocation())) { + catalogBuilder = catalogBuilder.setWarehouseLocation(catalogConfig.getWarehouseLocation()); + } + + // TODO: support dynamic destinations + DynamicDestinations dynamicDestinations = + DynamicDestinations.singleTable(TableIdentifier.parse(configuration.getTable())); + + IcebergWriteResult result = + rows.apply( + IcebergIO.writeToDynamicDestinations(catalogBuilder.build(), dynamicDestinations)); + + PCollection<Row> snapshots = + result + .getSnapshots() + .apply(MapElements.via(new SnapshotToRow())) + .setRowSchema(OUTPUT_SCHEMA); + + return PCollectionRowTuple.of(OUTPUT_TAG, snapshots); + } + + @VisibleForTesting + static class SnapshotToRow extends SimpleFunction<KV<String, Snapshot>, Row> { + @Override + public Row apply(KV<String, Snapshot> input) { + Snapshot snapshot = input.getValue(); + Row row = + Row.withSchema(OUTPUT_SCHEMA) + .addValues( + input.getKey(), + snapshot.operation(), + snapshot.summary(), + snapshot.manifestListLocation()) + .build(); + System.out.println("SNAPSHOT: " + snapshot); Review Comment: Remove prints. ########## sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergWriteSchemaTransformProvider.java: ########## @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import java.util.*; +import org.apache.beam.io.iceberg.IcebergWriteSchemaTransformProvider.Config; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.util.Preconditions; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.catalog.TableIdentifier; +import org.checkerframework.checker.nullness.qual.Nullable; + +@AutoService(SchemaTransformProvider.class) +public class IcebergWriteSchemaTransformProvider extends TypedSchemaTransformProvider<Config> { + + static final String INPUT_TAG = "input"; + static final String OUTPUT_TAG = "output"; + + @Override + protected SchemaTransform from(Config configuration) { + configuration.validate(); + return new IcebergWriteSchemaTransform(configuration); + } + + @Override + public List<String> inputCollectionNames() { + return Collections.singletonList(INPUT_TAG); + } + + @Override + public List<String> outputCollectionNames() { + return Collections.singletonList(OUTPUT_TAG); + } + + @Override + public String identifier() { + return "beam:schematransform:org.apache.beam:iceberg_write:v1"; Review Comment: We discussed offline and this PR includes updated URNs/payloads for the new schema-aware transforms ########## sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIOTranslation.java: ########## @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import static org.apache.beam.io.iceberg.IcebergIO.ReadRows; +import static org.apache.beam.io.iceberg.IcebergIO.WriteRows; +import static org.apache.beam.sdk.util.construction.TransformUpgrader.fromByteArray; +import static org.apache.beam.sdk.util.construction.TransformUpgrader.toByteArray; + +import com.google.auto.service.AutoService; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InvalidClassException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; +import org.apache.beam.model.pipeline.v1.SchemaApi; +import org.apache.beam.model.pipeline.v1.SchemaAwareTransforms.SchemaAwareTransformPayload; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaTranslation; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator; +import org.apache.beam.sdk.util.construction.SdkComponents; +import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.catalog.TableIdentifier; +import org.checkerframework.checker.nullness.qual.Nullable; + +@SuppressWarnings({"rawtypes", "nullness"}) +public class IcebergIOTranslation { + static class IcebergIOReadTranslator implements TransformPayloadTranslator<ReadRows> { + + static final Schema READ_SCHEMA = + Schema.builder() + .addByteArrayField("catalog_config") + .addNullableStringField("table_identifier") + .build(); + + public static final String ICEBERG_READ_TRANSFORM_URN = + "beam:transform:org.apache.beam:iceberg_read:v1"; + + @Override + public String getUrn() { + return ICEBERG_READ_TRANSFORM_URN; + } + + @Override + public @Nullable FunctionSpec translate( + AppliedPTransform<?, ?, ReadRows> application, SdkComponents components) + throws IOException { + SchemaApi.Schema expansionSchema = SchemaTranslation.schemaToProto(READ_SCHEMA, true); + Row configRow = toConfigRow(application.getTransform()); + ByteArrayOutputStream os = new ByteArrayOutputStream(); + RowCoder.of(READ_SCHEMA).encode(configRow, os); + + return FunctionSpec.newBuilder() + .setUrn(getUrn()) + .setPayload( + SchemaAwareTransformPayload.newBuilder() + .setExpansionSchema(expansionSchema) + .setExpansionPayload(ByteString.copyFrom(os.toByteArray())) + .build() + .toByteString()) + .build(); + } + + @Override + public Row toConfigRow(ReadRows transform) { + + Map<String, Object> fieldValues = new HashMap<>(); + + if (transform.getCatalogConfig() != null) { + fieldValues.put("catalog_config", toByteArray(transform.getCatalogConfig())); + } + if (transform.getTableIdentifier() != null) { + TableIdentifier identifier = transform.getTableIdentifier(); + List<String> identifierParts = + Arrays.stream(identifier.namespace().levels()).collect(Collectors.toList()); + identifierParts.add(identifier.name()); + fieldValues.put("table_identifier", String.join(".", identifierParts)); + } + + return Row.withSchema(READ_SCHEMA).withFieldValues(fieldValues).build(); + } + + @Override + public ReadRows fromConfigRow(Row configRow, PipelineOptions options) { + try { + ReadRows.Builder builder = new AutoValue_IcebergIO_ReadRows.Builder(); + + byte[] catalogBytes = configRow.getBytes("catalog_config"); + if (catalogBytes != null) { + builder = builder.setCatalogConfig((IcebergCatalogConfig) fromByteArray(catalogBytes)); + } + String tableIdentifier = configRow.getString("table_identifier"); + if (tableIdentifier != null) { + builder = builder.setTableIdentifier(TableIdentifier.parse(tableIdentifier)); + } + return builder.build(); + } catch (InvalidClassException e) { + throw new RuntimeException(e); + } + } + } + + @AutoService(TransformPayloadTranslatorRegistrar.class) + public static class ReadRegistrar implements TransformPayloadTranslatorRegistrar { + @Override + @SuppressWarnings({ + "rawtypes", + }) + public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator> + getTransformPayloadTranslators() { + return ImmutableMap.<Class<? extends PTransform>, TransformPayloadTranslator>builder() + .put(AutoValue_IcebergIO_ReadRows.class, new IcebergIOReadTranslator()) + .build(); + } + } + + static class IcebergIOWriteTranslator implements TransformPayloadTranslator<WriteRows> { + + static final Schema WRITE_SCHEMA = + Schema.builder() + .addByteArrayField("catalog_config") + .addNullableStringField("table_identifier") + .addNullableByteArrayField("dynamic_destinations") + .build(); + + public static final String ICEBERG_WRITE_TRANSFORM_URN = + "beam:transform:org.apache.beam:iceberg_write:v1"; + + @Override + public String getUrn() { + return ICEBERG_WRITE_TRANSFORM_URN; + } + + @Override + public @Nullable FunctionSpec translate( + AppliedPTransform<?, ?, WriteRows> application, SdkComponents components) + throws IOException { + SchemaApi.Schema expansionSchema = SchemaTranslation.schemaToProto(WRITE_SCHEMA, true); + Row configRow = toConfigRow(application.getTransform()); + ByteArrayOutputStream os = new ByteArrayOutputStream(); + RowCoder.of(WRITE_SCHEMA).encode(configRow, os); + + return FunctionSpec.newBuilder() + .setUrn(getUrn()) + .setPayload( + SchemaAwareTransformPayload.newBuilder() + .setExpansionSchema(expansionSchema) + .setExpansionPayload(ByteString.copyFrom(os.toByteArray())) + .build() + .toByteString()) + .build(); + } + + @Override + public Row toConfigRow(WriteRows transform) { + + Map<String, Object> fieldValues = new HashMap<>(); + + if (transform.getCatalogConfig() != null) { + fieldValues.put("catalog_config", toByteArray(transform.getCatalogConfig())); + } + if (transform.getTableIdentifier() != null) { + TableIdentifier identifier = transform.getTableIdentifier(); + List<String> identifierParts = + Arrays.stream(identifier.namespace().levels()).collect(Collectors.toList()); + identifierParts.add(identifier.name()); + fieldValues.put("table_identifier", String.join(".", identifierParts)); + } + if (transform.getDynamicDestinations() != null) { + fieldValues.put("dynamic_destinations", toByteArray(transform.getDynamicDestinations())); Review Comment: Ditto regarding Java serialization of complex objects. ########## sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIO.java: ########## @@ -19,73 +19,116 @@ import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import com.google.auto.value.AutoValue; +import java.util.Arrays; +import java.util.List; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; import org.checkerframework.checker.nullness.qual.Nullable; public class IcebergIO { - public static WriteRows writeToDynamicDestinations( - IcebergCatalogConfig catalog, DynamicDestinations dynamicDestinations) { - return new WriteRows(catalog, dynamicDestinations); + public static WriteRows writeRows(IcebergCatalogConfig catalog) { + return new AutoValue_IcebergIO_WriteRows.Builder().setCatalogConfig(catalog).build(); Review Comment: Note that we don't necessarily have to use AutoValue here. If we just keep the existing "WriteRows" and register a TransformTranslation for that, it should still work (and probably simpler). (and similarly for ReadRows) ########## sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java: ########## @@ -147,14 +147,27 @@ public static Row toBeamRow( } @SuppressWarnings("nullness") - public static Row toBeamRow(Map<String, Object> yamlMap, Schema rowSchema, boolean toCamelCase) { + public static @Nullable Row toBeamRow( + @Nullable Map<String, Object> map, Schema rowSchema, boolean toCamelCase) { + if (map == null || map.isEmpty()) { + List<Field> requiredFields = + rowSchema.getFields().stream() + .filter(field -> !field.getType().getNullable()) + .collect(Collectors.toList()); + if (requiredFields.isEmpty()) { + return null; + } else { Review Comment: Also assert for the count ? ########## sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java: ########## @@ -276,7 +279,7 @@ static RunnerApi.PTransform toProto( * * <p>Does not register the {@code appliedPTransform} within the provided {@link SdkComponents}. */ - static RunnerApi.PTransform toProto( + public static RunnerApi.PTransform toProto( Review Comment: This access modifier change should not be needed. ########## sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java: ########## @@ -108,8 +107,8 @@ public static ManagedTransform read(String source) { } /** - * Instantiates a {@link Managed.Write} transform for the specified sink. The supported managed - * sinks are: + * Instantiates a {@link Managed.ManagedTransform} transform for the specified sink. The supported Review Comment: I think we wanted to introduced an outer "Managed" PTransform that will be returned here that may have Java-specific adapters etc. in the future. ########## sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIO.java: ########## @@ -19,73 +19,116 @@ import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import com.google.auto.value.AutoValue; +import java.util.Arrays; +import java.util.List; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; import org.checkerframework.checker.nullness.qual.Nullable; public class IcebergIO { - public static WriteRows writeToDynamicDestinations( - IcebergCatalogConfig catalog, DynamicDestinations dynamicDestinations) { - return new WriteRows(catalog, dynamicDestinations); + public static WriteRows writeRows(IcebergCatalogConfig catalog) { Review Comment: Is this interface change needed ? ########## sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIOTranslation.java: ########## @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import static org.apache.beam.io.iceberg.IcebergIO.ReadRows; +import static org.apache.beam.io.iceberg.IcebergIO.WriteRows; +import static org.apache.beam.sdk.util.construction.TransformUpgrader.fromByteArray; +import static org.apache.beam.sdk.util.construction.TransformUpgrader.toByteArray; + +import com.google.auto.service.AutoService; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InvalidClassException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; +import org.apache.beam.model.pipeline.v1.SchemaApi; +import org.apache.beam.model.pipeline.v1.SchemaAwareTransforms.SchemaAwareTransformPayload; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaTranslation; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator; +import org.apache.beam.sdk.util.construction.SdkComponents; +import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.catalog.TableIdentifier; +import org.checkerframework.checker.nullness.qual.Nullable; + +@SuppressWarnings({"rawtypes", "nullness"}) +public class IcebergIOTranslation { + static class IcebergIOReadTranslator implements TransformPayloadTranslator<ReadRows> { + + static final Schema READ_SCHEMA = + Schema.builder() + .addByteArrayField("catalog_config") Review Comment: We discussed using camel case here for consistency (and to prevent a case-conversion) right ? ########## sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergWriteSchemaTransformProvider.java: ########## @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import java.util.*; +import org.apache.beam.io.iceberg.IcebergWriteSchemaTransformProvider.Config; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.util.Preconditions; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.catalog.TableIdentifier; +import org.checkerframework.checker.nullness.qual.Nullable; + +@AutoService(SchemaTransformProvider.class) +public class IcebergWriteSchemaTransformProvider extends TypedSchemaTransformProvider<Config> { + + static final String INPUT_TAG = "input"; + static final String OUTPUT_TAG = "output"; + + @Override + protected SchemaTransform from(Config configuration) { + configuration.validate(); + return new IcebergWriteSchemaTransform(configuration); + } + + @Override + public List<String> inputCollectionNames() { + return Collections.singletonList(INPUT_TAG); + } + + @Override + public List<String> outputCollectionNames() { + return Collections.singletonList(OUTPUT_TAG); + } + + @Override + public String identifier() { + return "beam:schematransform:org.apache.beam:iceberg_write:v1"; Review Comment: This and proto conversion can be a separate PR. ########## sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIOTranslation.java: ########## @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import static org.apache.beam.io.iceberg.IcebergIO.ReadRows; +import static org.apache.beam.io.iceberg.IcebergIO.WriteRows; +import static org.apache.beam.sdk.util.construction.TransformUpgrader.fromByteArray; +import static org.apache.beam.sdk.util.construction.TransformUpgrader.toByteArray; + +import com.google.auto.service.AutoService; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InvalidClassException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; +import org.apache.beam.model.pipeline.v1.SchemaApi; +import org.apache.beam.model.pipeline.v1.SchemaAwareTransforms.SchemaAwareTransformPayload; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaTranslation; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator; +import org.apache.beam.sdk.util.construction.SdkComponents; +import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.catalog.TableIdentifier; +import org.checkerframework.checker.nullness.qual.Nullable; + +@SuppressWarnings({"rawtypes", "nullness"}) +public class IcebergIOTranslation { + static class IcebergIOReadTranslator implements TransformPayloadTranslator<ReadRows> { + + static final Schema READ_SCHEMA = + Schema.builder() + .addByteArrayField("catalog_config") + .addNullableStringField("table_identifier") + .build(); + + public static final String ICEBERG_READ_TRANSFORM_URN = + "beam:transform:org.apache.beam:iceberg_read:v1"; + + @Override + public String getUrn() { + return ICEBERG_READ_TRANSFORM_URN; + } + + @Override + public @Nullable FunctionSpec translate( + AppliedPTransform<?, ?, ReadRows> application, SdkComponents components) + throws IOException { + SchemaApi.Schema expansionSchema = SchemaTranslation.schemaToProto(READ_SCHEMA, true); + Row configRow = toConfigRow(application.getTransform()); + ByteArrayOutputStream os = new ByteArrayOutputStream(); + RowCoder.of(READ_SCHEMA).encode(configRow, os); + + return FunctionSpec.newBuilder() + .setUrn(getUrn()) + .setPayload( + SchemaAwareTransformPayload.newBuilder() + .setExpansionSchema(expansionSchema) + .setExpansionPayload(ByteString.copyFrom(os.toByteArray())) + .build() + .toByteString()) + .build(); + } + + @Override + public Row toConfigRow(ReadRows transform) { + + Map<String, Object> fieldValues = new HashMap<>(); + + if (transform.getCatalogConfig() != null) { + fieldValues.put("catalog_config", toByteArray(transform.getCatalogConfig())); Review Comment: we should avoid Java serialization of complex objects if possible. I've seen these easily running into serialization related errors when re-building across versions. How about making `IcebergCatalogConfig` implement `Externalizable` and implementing `readExternal` and `writeExternal` that we can better reason about when it comes to serialization ? ########## sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIOTranslation.java: ########## @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import static org.apache.beam.io.iceberg.IcebergIO.ReadRows; +import static org.apache.beam.io.iceberg.IcebergIO.WriteRows; +import static org.apache.beam.sdk.util.construction.TransformUpgrader.fromByteArray; +import static org.apache.beam.sdk.util.construction.TransformUpgrader.toByteArray; + +import com.google.auto.service.AutoService; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InvalidClassException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; +import org.apache.beam.model.pipeline.v1.SchemaApi; +import org.apache.beam.model.pipeline.v1.SchemaAwareTransforms.SchemaAwareTransformPayload; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaTranslation; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator; +import org.apache.beam.sdk.util.construction.SdkComponents; +import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.catalog.TableIdentifier; +import org.checkerframework.checker.nullness.qual.Nullable; + +@SuppressWarnings({"rawtypes", "nullness"}) +public class IcebergIOTranslation { + static class IcebergIOReadTranslator implements TransformPayloadTranslator<ReadRows> { + + static final Schema READ_SCHEMA = + Schema.builder() + .addByteArrayField("catalog_config") + .addNullableStringField("table_identifier") + .build(); + + public static final String ICEBERG_READ_TRANSFORM_URN = + "beam:transform:org.apache.beam:iceberg_read:v1"; + + @Override + public String getUrn() { + return ICEBERG_READ_TRANSFORM_URN; + } + + @Override + public @Nullable FunctionSpec translate( + AppliedPTransform<?, ?, ReadRows> application, SdkComponents components) + throws IOException { + SchemaApi.Schema expansionSchema = SchemaTranslation.schemaToProto(READ_SCHEMA, true); + Row configRow = toConfigRow(application.getTransform()); + ByteArrayOutputStream os = new ByteArrayOutputStream(); + RowCoder.of(READ_SCHEMA).encode(configRow, os); + + return FunctionSpec.newBuilder() + .setUrn(getUrn()) + .setPayload( + SchemaAwareTransformPayload.newBuilder() + .setExpansionSchema(expansionSchema) + .setExpansionPayload(ByteString.copyFrom(os.toByteArray())) + .build() + .toByteString()) + .build(); + } + + @Override + public Row toConfigRow(ReadRows transform) { + + Map<String, Object> fieldValues = new HashMap<>(); + + if (transform.getCatalogConfig() != null) { + fieldValues.put("catalog_config", toByteArray(transform.getCatalogConfig())); + } + if (transform.getTableIdentifier() != null) { + TableIdentifier identifier = transform.getTableIdentifier(); + List<String> identifierParts = + Arrays.stream(identifier.namespace().levels()).collect(Collectors.toList()); + identifierParts.add(identifier.name()); + fieldValues.put("table_identifier", String.join(".", identifierParts)); + } + + return Row.withSchema(READ_SCHEMA).withFieldValues(fieldValues).build(); + } + + @Override + public ReadRows fromConfigRow(Row configRow, PipelineOptions options) { + try { + ReadRows.Builder builder = new AutoValue_IcebergIO_ReadRows.Builder(); Review Comment: I was expecting that we could build the transform using the schema-aware transform Row constructor instead of using a builder here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
