TheNeuralBit commented on a change in pull request #14960:
URL: https://github.com/apache/beam/pull/14960#discussion_r648506759



##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java
##########
@@ -64,64 +70,95 @@
     return new Inner<>();
   }
 
-  // Describes a single renameSchema rule.
-  private static class RenamePair implements Serializable {
+  // Describes a single renameSchema rule
+  @AutoValue
+  abstract static class RenamePair implements Serializable {
     // The FieldAccessDescriptor describing the field to renameSchema. Must 
reference a singleton
     // field.
-    private final FieldAccessDescriptor fieldAccessDescriptor;
+    abstract FieldAccessDescriptor getFieldAccessDescriptor();
     // The new name for the field.
-    private final String newName;
+    abstract String getNewName();
 
-    RenamePair(FieldAccessDescriptor fieldAccessDescriptor, String newName) {
-      this.fieldAccessDescriptor = fieldAccessDescriptor;
-      this.newName = newName;
+    static RenamePair of(FieldAccessDescriptor fieldAccessDescriptor, String 
newName) {
+      return new AutoValue_RenameFields_RenamePair(fieldAccessDescriptor, 
newName);
     }
 
     RenamePair resolve(Schema schema) {
-      FieldAccessDescriptor resolved = fieldAccessDescriptor.resolve(schema);
+      FieldAccessDescriptor resolved = 
getFieldAccessDescriptor().resolve(schema);
       if (!resolved.referencesSingleField()) {
         throw new IllegalArgumentException(resolved + " references multiple 
fields.");
       }
-      return new RenamePair(resolved, newName);
+      return RenamePair.of(resolved, getNewName());
     }
   }
 
-  private static FieldType renameFieldType(FieldType inputType, 
Collection<RenamePair> renames) {
+  private static FieldType renameFieldType(
+      FieldType inputType,
+      Collection<RenamePair> renames,
+      Map<UUID, Schema> renamedSchemasMap,
+      Map<UUID, BitSet> nestedFieldRenamedMap) {
+    if (renames.isEmpty()) {
+      return inputType;
+    }
+
     switch (inputType.getTypeName()) {
       case ROW:
-        return FieldType.row(renameSchema(inputType.getRowSchema(), renames));
+        renameSchema(inputType.getRowSchema(), renames, renamedSchemasMap, 
nestedFieldRenamedMap);
+        return 
FieldType.row(renamedSchemasMap.get(inputType.getRowSchema().getUUID()));
       case ARRAY:
-        return 
FieldType.array(renameFieldType(inputType.getCollectionElementType(), renames));
+        return FieldType.array(
+            renameFieldType(
+                inputType.getCollectionElementType(),
+                renames,
+                renamedSchemasMap,
+                nestedFieldRenamedMap));
       case ITERABLE:
-        return 
FieldType.iterable(renameFieldType(inputType.getCollectionElementType(), 
renames));
+        return FieldType.iterable(
+            renameFieldType(
+                inputType.getCollectionElementType(),
+                renames,
+                renamedSchemasMap,
+                nestedFieldRenamedMap));
       case MAP:
         return FieldType.map(
-            renameFieldType(inputType.getMapKeyType(), renames),
-            renameFieldType(inputType.getMapValueType(), renames));
+            renameFieldType(
+                inputType.getMapKeyType(), renames, renamedSchemasMap, 
nestedFieldRenamedMap),
+            renameFieldType(
+                inputType.getMapValueType(), renames, renamedSchemasMap, 
nestedFieldRenamedMap));
+      case LOGICAL_TYPE:
+        throw new RuntimeException("RenameFields does not support renaming 
logical types.");
       default:
         return inputType;
     }
   }
 
   // Apply the user-specified renames to the input schema.
-  private static Schema renameSchema(Schema inputSchema, 
Collection<RenamePair> renames) {
+  @VisibleForTesting

Review comment:
       Why not just test through the public interface, rather than exposing 
`renameSchema` and `renameRows`?

##########
File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/RenameFieldsTest.java
##########
@@ -258,4 +265,65 @@ public void renameNestedInMapFields() {
     PAssert.that(renamed).containsInAnyOrder(expectedRows);
     pipeline.run();
   }
+
+  @Test
+  public void testRenameRow() {
+    Schema nestedSchema = 
Schema.builder().addStringField("field1").addInt32Field("field2").build();
+    Schema schema =
+        Schema.builder().addStringField("field1").addRowField("nested", 
nestedSchema).build();
+
+    Schema expectedNestedSchema =
+        
Schema.builder().addStringField("bottom1").addInt32Field("bottom2").build();
+    Schema expectedSchema =
+        Schema.builder()
+            .addStringField("top1")
+            .addRowField("top_nested", expectedNestedSchema)

Review comment:
       optional super nitty nit: I think this would be easier to grok if the 
nested schemas were defined inline.

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java
##########
@@ -164,29 +198,112 @@ private Inner(List<RenamePair> renames) {
       List<RenamePair> newList =
           ImmutableList.<RenamePair>builder()
               .addAll(renames)
-              .add(new RenamePair(field, newName))
+              .add(RenamePair.of(field, newName))
               .build();
 
       return new Inner<>(newList);
     }
 
     @Override
     public PCollection<Row> expand(PCollection<T> input) {
-      Schema inputSchema = input.getSchema();
+      final Map<UUID, Schema> renamedSchemasMap = Maps.newHashMap();
+      final Map<UUID, BitSet> nestedFieldRenamedMap = Maps.newHashMap();
 
-      List<RenamePair> pairs =
-          renames.stream().map(r -> 
r.resolve(inputSchema)).collect(Collectors.toList());
-      final Schema outputSchema = renameSchema(inputSchema, pairs);
+      List<RenamePair> resolvedRenames =
+          renames.stream().map(r -> 
r.resolve(input.getSchema())).collect(Collectors.toList());
+      renameSchema(input.getSchema(), resolvedRenames, renamedSchemasMap, 
nestedFieldRenamedMap);
+      final Schema outputSchema = 
renamedSchemasMap.get(input.getSchema().getUUID());
+      final BitSet nestedRenames = 
nestedFieldRenamedMap.get(input.getSchema().getUUID());
       return input
           .apply(
               ParDo.of(
                   new DoFn<T, Row>() {
                     @ProcessElement
                     public void processElement(@Element Row row, 
OutputReceiver<Row> o) {
-                      
o.output(Row.withSchema(outputSchema).attachValues(row.getValues()));
+                      o.output(
+                          renameRow(
+                              row,
+                              outputSchema,
+                              nestedRenames,
+                              renamedSchemasMap,
+                              nestedFieldRenamedMap));
                     }
                   }))
           .setRowSchema(outputSchema);
     }
   }
+
+  // TODO(reuvenlax): For better performance, we should reuse functionality in

Review comment:
       Consider a jira for this




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to