[
https://issues.apache.org/jira/browse/BEAM-12442?focusedWorklogId=609264&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-609264
]
ASF GitHub Bot logged work on BEAM-12442:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 09/Jun/21 17:08
Start Date: 09/Jun/21 17:08
Worklog Time Spent: 10m
Work Description: TheNeuralBit commented on a change in pull request
#14960:
URL: https://github.com/apache/beam/pull/14960#discussion_r648506759
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java
##########
@@ -64,64 +70,95 @@
return new Inner<>();
}
- // Describes a single renameSchema rule.
- private static class RenamePair implements Serializable {
+ // Describes a single renameSchema rule
+ @AutoValue
+ abstract static class RenamePair implements Serializable {
// The FieldAccessDescriptor describing the field to renameSchema. Must
reference a singleton
// field.
- private final FieldAccessDescriptor fieldAccessDescriptor;
+ abstract FieldAccessDescriptor getFieldAccessDescriptor();
// The new name for the field.
- private final String newName;
+ abstract String getNewName();
- RenamePair(FieldAccessDescriptor fieldAccessDescriptor, String newName) {
- this.fieldAccessDescriptor = fieldAccessDescriptor;
- this.newName = newName;
+ static RenamePair of(FieldAccessDescriptor fieldAccessDescriptor, String
newName) {
+ return new AutoValue_RenameFields_RenamePair(fieldAccessDescriptor,
newName);
}
RenamePair resolve(Schema schema) {
- FieldAccessDescriptor resolved = fieldAccessDescriptor.resolve(schema);
+ FieldAccessDescriptor resolved =
getFieldAccessDescriptor().resolve(schema);
if (!resolved.referencesSingleField()) {
throw new IllegalArgumentException(resolved + " references multiple
fields.");
}
- return new RenamePair(resolved, newName);
+ return RenamePair.of(resolved, getNewName());
}
}
- private static FieldType renameFieldType(FieldType inputType,
Collection<RenamePair> renames) {
+ private static FieldType renameFieldType(
+ FieldType inputType,
+ Collection<RenamePair> renames,
+ Map<UUID, Schema> renamedSchemasMap,
+ Map<UUID, BitSet> nestedFieldRenamedMap) {
+ if (renames.isEmpty()) {
+ return inputType;
+ }
+
switch (inputType.getTypeName()) {
case ROW:
- return FieldType.row(renameSchema(inputType.getRowSchema(), renames));
+ renameSchema(inputType.getRowSchema(), renames, renamedSchemasMap,
nestedFieldRenamedMap);
+ return
FieldType.row(renamedSchemasMap.get(inputType.getRowSchema().getUUID()));
case ARRAY:
- return
FieldType.array(renameFieldType(inputType.getCollectionElementType(), renames));
+ return FieldType.array(
+ renameFieldType(
+ inputType.getCollectionElementType(),
+ renames,
+ renamedSchemasMap,
+ nestedFieldRenamedMap));
case ITERABLE:
- return
FieldType.iterable(renameFieldType(inputType.getCollectionElementType(),
renames));
+ return FieldType.iterable(
+ renameFieldType(
+ inputType.getCollectionElementType(),
+ renames,
+ renamedSchemasMap,
+ nestedFieldRenamedMap));
case MAP:
return FieldType.map(
- renameFieldType(inputType.getMapKeyType(), renames),
- renameFieldType(inputType.getMapValueType(), renames));
+ renameFieldType(
+ inputType.getMapKeyType(), renames, renamedSchemasMap,
nestedFieldRenamedMap),
+ renameFieldType(
+ inputType.getMapValueType(), renames, renamedSchemasMap,
nestedFieldRenamedMap));
+ case LOGICAL_TYPE:
+ throw new RuntimeException("RenameFields does not support renaming
logical types.");
default:
return inputType;
}
}
// Apply the user-specified renames to the input schema.
- private static Schema renameSchema(Schema inputSchema,
Collection<RenamePair> renames) {
+ @VisibleForTesting
Review comment:
Why not just test through the public interface, rather than exposing
`renameSchema` and `renameRows`?
##########
File path:
sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/RenameFieldsTest.java
##########
@@ -258,4 +265,65 @@ public void renameNestedInMapFields() {
PAssert.that(renamed).containsInAnyOrder(expectedRows);
pipeline.run();
}
+
+ @Test
+ public void testRenameRow() {
+ Schema nestedSchema =
Schema.builder().addStringField("field1").addInt32Field("field2").build();
+ Schema schema =
+ Schema.builder().addStringField("field1").addRowField("nested",
nestedSchema).build();
+
+ Schema expectedNestedSchema =
+
Schema.builder().addStringField("bottom1").addInt32Field("bottom2").build();
+ Schema expectedSchema =
+ Schema.builder()
+ .addStringField("top1")
+ .addRowField("top_nested", expectedNestedSchema)
Review comment:
optional super nitty nit: I think this would be easier to grok if the
nested schemas were defined inline.
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java
##########
@@ -164,29 +198,112 @@ private Inner(List<RenamePair> renames) {
List<RenamePair> newList =
ImmutableList.<RenamePair>builder()
.addAll(renames)
- .add(new RenamePair(field, newName))
+ .add(RenamePair.of(field, newName))
.build();
return new Inner<>(newList);
}
@Override
public PCollection<Row> expand(PCollection<T> input) {
- Schema inputSchema = input.getSchema();
+ final Map<UUID, Schema> renamedSchemasMap = Maps.newHashMap();
+ final Map<UUID, BitSet> nestedFieldRenamedMap = Maps.newHashMap();
- List<RenamePair> pairs =
- renames.stream().map(r ->
r.resolve(inputSchema)).collect(Collectors.toList());
- final Schema outputSchema = renameSchema(inputSchema, pairs);
+ List<RenamePair> resolvedRenames =
+ renames.stream().map(r ->
r.resolve(input.getSchema())).collect(Collectors.toList());
+ renameSchema(input.getSchema(), resolvedRenames, renamedSchemasMap,
nestedFieldRenamedMap);
+ final Schema outputSchema =
renamedSchemasMap.get(input.getSchema().getUUID());
+ final BitSet nestedRenames =
nestedFieldRenamedMap.get(input.getSchema().getUUID());
return input
.apply(
ParDo.of(
new DoFn<T, Row>() {
@ProcessElement
public void processElement(@Element Row row,
OutputReceiver<Row> o) {
-
o.output(Row.withSchema(outputSchema).attachValues(row.getValues()));
+ o.output(
+ renameRow(
+ row,
+ outputSchema,
+ nestedRenames,
+ renamedSchemasMap,
+ nestedFieldRenamedMap));
}
}))
.setRowSchema(outputSchema);
}
}
+
+ // TODO(reuvenlax): For better performance, we should reuse functionality in
Review comment:
Consider a jira for this
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 609264)
Time Spent: 40m (was: 0.5h)
> RenameFields disregards renames in nested fields with DataflowRunner
> --------------------------------------------------------------------
>
> Key: BEAM-12442
> URL: https://issues.apache.org/jira/browse/BEAM-12442
> Project: Beam
> Issue Type: Bug
> Components: sdk-java-core
> Affects Versions: 2.26.0
> Reporter: Matthew Ouyang
> Assignee: Reuven Lax
> Priority: P2
> Time Spent: 40m
> Remaining Estimate: 0h
>
> Summary - When applying a `RenameFields` followed by
> BigQueryUtils.toTableRow(Row), renames on nested fields were not being
> applied when using DataflowRunner. DirectRunner appeared to be fine. This
> led to a failed BigQueryIO.Write.
> Expected Behaviour - (field0_1.field1_0, nestedStringField) ->
> field0_1.nestedStringField and successful BQ insert
> Actual Behaviour - (field0_1.field1_0, nestedStringField) ->
> field0_1.field1_0 and failed BQ insert because field0_1.field1_0 was not in
> the target BQ table
> Thread from Users mailing list -
> https://lists.apache.org/thread.html/rd33a6f2bd8454cf650d70795bcdbbb873f988a557133d3bdbfc56cfc%40%3Cuser.beam.apache.org%3E
--
This message was sent by Atlassian Jira
(v8.3.4#803005)