ahmedabu98 commented on code in PR #30910:
URL: https://github.com/apache/beam/pull/30910#discussion_r1573140471


##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslation.java:
##########
@@ -51,38 +53,47 @@ static class IcebergReadSchemaTransformTranslator
 
     @Override
     public String getUrn() {
-      return READ_PROVIDER.identifier();
+      return BeamUrns.getUrn(SCHEMA_TRANSFORM);
     }
 
     @Override
     public @Nullable FunctionSpec translate(
         AppliedPTransform<?, ?, IcebergReadSchemaTransform> application, 
SdkComponents components)
         throws IOException {
-      SchemaApi.Schema expansionSchema = 
SchemaTranslation.schemaToProto(READ_SCHEMA, true);
+      Schema snakeCaseSchema = READ_SCHEMA.toSnakeCase();
+      SchemaApi.Schema expansionSchema = 
SchemaTranslation.schemaToProto(snakeCaseSchema, true);
       Row configRow = toConfigRow(application.getTransform());
       ByteArrayOutputStream os = new ByteArrayOutputStream();
-      RowCoder.of(READ_SCHEMA).encode(configRow, os);
+      RowCoder.of(snakeCaseSchema).encode(configRow, os);
 
       return FunctionSpec.newBuilder()
           .setUrn(getUrn())
           .setPayload(
               SchemaTransformPayload.newBuilder()
+                  .setIdentifier(READ_PROVIDER.identifier())
                   .setConfigurationSchema(expansionSchema)
                   .setConfigurationRow(ByteString.copyFrom(os.toByteArray()))
-                  .setIdentifier(getUrn())
                   .build()
                   .toByteString())
           .build();
     }
 
     @Override
     public Row toConfigRow(IcebergReadSchemaTransform transform) {
-      return transform.getConfigurationRow();
+      // Will retrieve a Row with snake_case naming convention.
+      // Transform expects camelCase convention, so convert back
+      // TODO(https://github.com/apache/beam/issues/31061): Remove conversion 
when
+      // TypedSchemaTransformProvider starts generating with snake_case 
convention
+      return transform.getConfigurationRow().toSnakeCase();
     }
 
     @Override
     public IcebergReadSchemaTransform fromConfigRow(Row configRow, 
PipelineOptions options) {

Review Comment:
   Added some minor changes to TypedSchemaTransformProvider. 
   - It now expects a snake_case Row, which it then converts to camelCase to 
generate a ConfigT object (AutoValueSchema expects camelCase field names).
   - When getting configurationSchema(), it generates a camelCase schema from 
ConfigT, converts it to snake_case, then outputs.
   
   Effectively, all TypedSchemaTransformProvider implementations will now have 
snake_case configuration schemas.
   
   I updated the translation logic to reflect this, and also updated existing 
Python wrapper logic to work with snake_case parameters.
   
   @robertwb I wonder if we need to update things in Beam YAML too. For example 
the renaming happening in this file: 
https://github.com/apache/beam/blob/93aa62c2900b210ee5e26849949154f19db51f4c/sdks/python/apache_beam/yaml/standard_io.yaml#L25-L40



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to