ahmedabu98 commented on code in PR #30910:
URL: https://github.com/apache/beam/pull/30910#discussion_r1573140471
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslation.java:
##########
@@ -51,38 +53,47 @@ static class IcebergReadSchemaTransformTranslator
@Override
public String getUrn() {
- return READ_PROVIDER.identifier();
+ return BeamUrns.getUrn(SCHEMA_TRANSFORM);
}
@Override
public @Nullable FunctionSpec translate(
AppliedPTransform<?, ?, IcebergReadSchemaTransform> application,
SdkComponents components)
throws IOException {
- SchemaApi.Schema expansionSchema =
SchemaTranslation.schemaToProto(READ_SCHEMA, true);
+ Schema snakeCaseSchema = READ_SCHEMA.toSnakeCase();
+ SchemaApi.Schema expansionSchema =
SchemaTranslation.schemaToProto(snakeCaseSchema, true);
Row configRow = toConfigRow(application.getTransform());
ByteArrayOutputStream os = new ByteArrayOutputStream();
- RowCoder.of(READ_SCHEMA).encode(configRow, os);
+ RowCoder.of(snakeCaseSchema).encode(configRow, os);
return FunctionSpec.newBuilder()
.setUrn(getUrn())
.setPayload(
SchemaTransformPayload.newBuilder()
+ .setIdentifier(READ_PROVIDER.identifier())
.setConfigurationSchema(expansionSchema)
.setConfigurationRow(ByteString.copyFrom(os.toByteArray()))
- .setIdentifier(getUrn())
.build()
.toByteString())
.build();
}
@Override
public Row toConfigRow(IcebergReadSchemaTransform transform) {
- return transform.getConfigurationRow();
+ // Will retrieve a Row with snake_case naming convention.
+ // Transform expects camelCase convention, so convert back
+ // TODO(https://github.com/apache/beam/issues/31061): Remove conversion
when
+ // TypedSchemaTransformProvider starts generating with snake_case
convention
+ return transform.getConfigurationRow().toSnakeCase();
}
@Override
public IcebergReadSchemaTransform fromConfigRow(Row configRow,
PipelineOptions options) {
Review Comment:
Added some minor changes to TypedSchemaTransformProvider.
- It now expects a snake_case Row, which it then converts to camelCase to
generate a ConfigT object (AutoValueSchema expects camelCase field names).
- When getting configurationSchema(), it generates a camelCase schema from
ConfigT, converts it to snake_case, then outputs.
Effectively, all TypedSchemaTransformProvider implementations will now have
snake_case configuration schemas.
I updated the translation logic to reflect this, and also updated existing
Python wrapper logic to work with snake_case parameters.
@robertwb I wonder if we need to update things in Beam YAML too. For example
the renaming happening in this file:
https://github.com/apache/beam/blob/93aa62c2900b210ee5e26849949154f19db51f4c/sdks/python/apache_beam/yaml/standard_io.yaml#L25-L40
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]