chamikaramj commented on code in PR #30910:
URL: https://github.com/apache/beam/pull/30910#discussion_r1573857432
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java:
##########
@@ -92,6 +92,12 @@ private static Schema getDataFileAvroSchema(FileWriteResult
fileWriteResult) {
@Override
public void encode(FileWriteResult value, OutputStream outStream)
throws CoderException, IOException {
+ // "version" of this coder.
+ // If breaking changes are introduced (e.g. from Beam, Iceberg, Avro,
etc..),
+ // then update this version and create a fork in decode() below for the
new decode logic.
+ // This helps keep the pipeline update-compatible
+ outStream.write(0);
Review Comment:
Let's make sure we have unit tests that exercise reading and writing from
this coder including the version bit.
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslation.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static
org.apache.beam.model.pipeline.v1.ExternalTransforms.ExpansionMethods.Enum.SCHEMA_TRANSFORM;
+import static
org.apache.beam.sdk.io.iceberg.IcebergReadSchemaTransformProvider.IcebergReadSchemaTransform;
+import static
org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.IcebergWriteSchemaTransform;
+
+import com.google.auto.service.AutoService;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import
org.apache.beam.model.pipeline.v1.ExternalTransforms.SchemaTransformPayload;
+import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.model.pipeline.v1.SchemaApi;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaTranslation;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.construction.BeamUrns;
+import
org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator;
+import org.apache.beam.sdk.util.construction.SdkComponents;
+import
org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@SuppressWarnings({"rawtypes", "nullness"})
+public class IcebergSchemaTransformTranslation {
+ static class IcebergReadSchemaTransformTranslator
+ implements TransformPayloadTranslator<IcebergReadSchemaTransform> {
+ static final IcebergReadSchemaTransformProvider READ_PROVIDER =
+ new IcebergReadSchemaTransformProvider();
+ static final Schema READ_SCHEMA = READ_PROVIDER.configurationSchema();
+
+ @Override
+ public String getUrn() {
+ return BeamUrns.getUrn(SCHEMA_TRANSFORM);
+ }
+
+ @Override
+ public @Nullable FunctionSpec translate(
+ AppliedPTransform<?, ?, IcebergReadSchemaTransform> application,
SdkComponents components)
+ throws IOException {
+ SchemaApi.Schema expansionSchema =
SchemaTranslation.schemaToProto(READ_SCHEMA, true);
+ Row configRow = toConfigRow(application.getTransform());
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ RowCoder.of(READ_SCHEMA).encode(configRow, os);
+
+ return FunctionSpec.newBuilder()
Review Comment:
Let's also update `TransformTranslation` so that we add the SchemaTransform
identifier as an annotation if the URN is `SCHEMA_TRANSFORM`. See below for an
example (and you can get the identifier from the payload add the the annotation
at the same location given below). This will allow us the identify
schema-transforms that we want manage without parsing the payload.
https://github.com/apache/beam/blob/93aa62c2900b210ee5e26849949154f19db51f4c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java#L528
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import com.google.auto.value.AutoValue;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class IcebergIO {
+
+ public static WriteRows writeRows(IcebergCatalogConfig catalog) {
+ return new
AutoValue_IcebergIO_WriteRows.Builder().setCatalogConfig(catalog).build();
Review Comment:
Do we still need the new builders here given that we are directly
initializing schema-transforms in `TransformPayloadTranslator`s ?
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslation.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static
org.apache.beam.model.pipeline.v1.ExternalTransforms.ExpansionMethods.Enum.SCHEMA_TRANSFORM;
+import static
org.apache.beam.sdk.io.iceberg.IcebergReadSchemaTransformProvider.IcebergReadSchemaTransform;
+import static
org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.IcebergWriteSchemaTransform;
+
+import com.google.auto.service.AutoService;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import
org.apache.beam.model.pipeline.v1.ExternalTransforms.SchemaTransformPayload;
+import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.model.pipeline.v1.SchemaApi;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaTranslation;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.construction.BeamUrns;
+import
org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator;
+import org.apache.beam.sdk.util.construction.SdkComponents;
+import
org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@SuppressWarnings({"rawtypes", "nullness"})
+public class IcebergSchemaTransformTranslation {
+ static class IcebergReadSchemaTransformTranslator
+ implements TransformPayloadTranslator<IcebergReadSchemaTransform> {
+ static final IcebergReadSchemaTransformProvider READ_PROVIDER =
+ new IcebergReadSchemaTransformProvider();
+ static final Schema READ_SCHEMA = READ_PROVIDER.configurationSchema();
+
+ @Override
+ public String getUrn() {
+ return BeamUrns.getUrn(SCHEMA_TRANSFORM);
Review Comment:
Actually, we use the urn when registering `TransformProvider`s at the
`ExpansionService`.
https://github.com/apache/beam/blob/93aa62c2900b210ee5e26849949154f19db51f4c/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L168
Hence' this will require us to do future modifications where we keep track
of `TransformProvider`s using the schema-transform identifier. I think this is
OK but wanted to call this out.
The other option will be go to back to using different URNs for different
schema-aware transforms.
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslation.java:
##########
@@ -51,38 +53,47 @@ static class IcebergReadSchemaTransformTranslator
@Override
public String getUrn() {
- return READ_PROVIDER.identifier();
+ return BeamUrns.getUrn(SCHEMA_TRANSFORM);
}
@Override
public @Nullable FunctionSpec translate(
AppliedPTransform<?, ?, IcebergReadSchemaTransform> application,
SdkComponents components)
throws IOException {
- SchemaApi.Schema expansionSchema =
SchemaTranslation.schemaToProto(READ_SCHEMA, true);
+ Schema snakeCaseSchema = READ_SCHEMA.toSnakeCase();
+ SchemaApi.Schema expansionSchema =
SchemaTranslation.schemaToProto(snakeCaseSchema, true);
Row configRow = toConfigRow(application.getTransform());
ByteArrayOutputStream os = new ByteArrayOutputStream();
- RowCoder.of(READ_SCHEMA).encode(configRow, os);
+ RowCoder.of(snakeCaseSchema).encode(configRow, os);
return FunctionSpec.newBuilder()
.setUrn(getUrn())
.setPayload(
SchemaTransformPayload.newBuilder()
+ .setIdentifier(READ_PROVIDER.identifier())
.setConfigurationSchema(expansionSchema)
.setConfigurationRow(ByteString.copyFrom(os.toByteArray()))
- .setIdentifier(getUrn())
.build()
.toByteString())
.build();
}
@Override
public Row toConfigRow(IcebergReadSchemaTransform transform) {
- return transform.getConfigurationRow();
+ // Will retrieve a Row with snake_case naming convention.
+ // Transform expects camelCase convention, so convert back
+ // TODO(https://github.com/apache/beam/issues/31061): Remove conversion
when
+ // TypedSchemaTransformProvider starts generating with snake_case
convention
+ return transform.getConfigurationRow().toSnakeCase();
}
@Override
public IcebergReadSchemaTransform fromConfigRow(Row configRow,
PipelineOptions options) {
Review Comment:
Sounds good. Thanks.
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformCatalogConfig.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Set;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import org.apache.iceberg.CatalogUtil;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class IcebergSchemaTransformCatalogConfig {
+ public static Builder builder() {
+ return new AutoValue_IcebergSchemaTransformCatalogConfig.Builder();
+ }
+
+ public abstract String getCatalogName();
+
+ @SchemaFieldDescription("Valid types are: {hadoop, hive, rest}")
+ public abstract @Nullable String getCatalogType();
+
+ public abstract @Nullable String getCatalogImplementation();
+
+ public abstract @Nullable String getWarehouseLocation();
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ public abstract Builder setCatalogName(String catalogName);
+
+ public abstract Builder setCatalogType(String catalogType);
+
+ public abstract Builder setCatalogImplementation(String
catalogImplementation);
+
+ public abstract Builder setWarehouseLocation(String warehouseLocation);
+
+ public abstract IcebergSchemaTransformCatalogConfig build();
+ }
+
+ public static final Schema SCHEMA;
+
+ static {
+ try {
+ SCHEMA =
+ SchemaRegistry.createDefault()
+ .getSchema(IcebergSchemaTransformCatalogConfig.class)
+ .sorted()
+ .toSnakeCase();
Review Comment:
Add a comment on why we use `toSnakeCase()` here and other schema-transform
schemas we define in this PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]