rdblue commented on a change in pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177#discussion_r464520655



##########
File path: core/src/main/java/org/apache/iceberg/SchemaUpdate.java
##########
@@ -308,6 +314,162 @@ public UpdateSchema moveAfter(String name, String 
afterName) {
     return this;
   }
 
+  @Override
+  public UpdateSchema upsertSchema(Schema newSchema) {
+    TypeUtil.visit(newSchema, new ApplyUpdates(this, schema));
+    return this;
+  }
+
+  private static class ApplyUpdates extends TypeUtil.SchemaVisitor<Void> {
+    private static final Joiner DOT = Joiner.on(".");
+    private final Deque<String> fieldNames = Lists.newLinkedList();
+    private NestedField currentField = null;
+
+    private final Schema baseSchema;
+    private final UpdateSchema api;
+    private final Map<String, Integer> indexByName;
+
+    private ApplyUpdates(UpdateSchema api, Schema baseSchema) {
+      this.api = api;
+      this.baseSchema = baseSchema;
+      this.indexByName = TypeUtil.indexByName(baseSchema.asStruct());
+    }
+
+    @Override
+    public void beforeListElement(NestedField elementField) {
+      beforeField(elementField);
+    }
+
+    @Override
+    public void afterListElement(NestedField elementField) {
+      afterField(elementField);
+    }
+
+    @Override
+    public void beforeMapKey(Types.NestedField keyField) {
+      beforeField(keyField);
+    }
+
+    @Override
+    public void afterMapKey(Types.NestedField keyField) {
+      afterField(keyField);
+    }
+
+    @Override
+    public void beforeMapValue(Types.NestedField valueField) {
+      beforeField(valueField);
+    }
+
+    @Override
+    public void afterMapValue(Types.NestedField valueField) {
+      afterField(valueField);
+    }
+
+    @Override
+    public void beforeField(NestedField field) {
+      fieldNames.push(field.name()); // we don't expect `element` to show up - 
it breaks
+      currentField = field;
+    }
+
+    @Override
+    public void afterField(NestedField field) {
+      fieldNames.pop();
+    }
+
+    @Override
+    public Void field(NestedField field, Void fieldResult) {
+      return super.field(field, fieldResult);
+    }
+
+    @Override
+    public Void list(ListType list, Void elementResult) {
+      String fullName = DOT.join(fieldNames.descendingIterator());
+      Types.NestedField field = baseSchema.findField(fullName);
+      if (field == null) {
+        addColumn(fieldNames.peekFirst(), Types.ListType.ofOptional(0,   
list.elementType()), ancestors());
+      } else if (!field.type().isListType()) {
+        throw new IllegalArgumentException(
+            "Cannot update existing field: " + fullName + " of type: " + field
+                .type() + " to type list");
+      }
+      return null;
+    }
+
+    @Override
+    public Void map(MapType map, Void keyResult, Void valueResult) {
+      String fullName = DOT.join(fieldNames.descendingIterator());
+      Types.NestedField field = baseSchema.findField(fullName);
+      if (field == null) {
+        addColumn(fieldNames.peekFirst(), Types.MapType.ofOptional(0, 1, 
map.keyType(), map.valueType()), ancestors());
+      } else if (!field.type().isMapType()) {
+        throw new IllegalArgumentException(
+            "Cannot update existing field: " + fullName + " of type: " + field
+                .type() + " to type map");
+      }
+      return null;
+    }
+
+    @Override
+    public Void struct(Types.StructType struct, List<Void> fieldResults) {
+      if (fieldNames.isEmpty()) {
+        return null; // this is the root struct
+      }
+      String fullName = DOT.join(fieldNames.descendingIterator());
+      Types.NestedField field = baseSchema.findField(fullName);
+      if (field == null) {
+        addColumn(fieldNames.peekFirst(), 
Types.StructType.of(struct.fields()), ancestors());
+      } else if (!field.type().isStructType()) {
+        throw new IllegalArgumentException("Cannot update existing field: " + 
fullName + " of type: " + field.type() +
+                " to type struct");
+      }
+      return null;
+    }
+
+    @Override
+    public Void primitive(PrimitiveType primitive) {
+      String fullName = DOT.join(fieldNames.descendingIterator());
+      Types.NestedField field = baseSchema.findField(fullName);
+      PrimitiveType newFieldType = 
Types.fromPrimitiveString(primitive.toString());
+      if (field == null) {
+        addColumn(currentField.name(), 
Types.fromPrimitiveString(primitive.toString()), ancestors());
+      } else if (!field.type().isPrimitiveType()) {
+        throw new IllegalArgumentException("Cannot update existing field: " + 
field.name() + " of type: " +
+                field.type() + " to primitive type: " + 
primitive.typeId().name());
+      } else if (!newFieldType.equals(field.type())) {
+        updateColumn(field.type().asPrimitiveType(), fullName, field.doc(), 
newFieldType, currentField.doc());
+      }
+      return null;
+    }
+
+    private String ancestors() {
+      if (fieldNames.isEmpty()) {
+        return "";
+      }
+      String head = fieldNames.removeFirst();
+      String join = DOT.join(fieldNames.descendingIterator());
+      fieldNames.addFirst(head);
+      return join;
+    }
+
+    private void updateColumn(PrimitiveType fieldType, String fullName, String 
fieldDoc, PrimitiveType newFieldType,
+                              String newDoc) {
+      if (!fieldType.equals(newFieldType)) {
+        api.updateColumn(fullName, newFieldType.asPrimitiveType());
+      } else if (newDoc != null && !newDoc.equals(fieldDoc)) {
+        api.updateColumnDoc(fullName, newDoc);
+      }
+    }
+
+    private void addColumn(String name, Type type, String ancestors) {
+      if (ancestors.isEmpty()) {
+        api.addColumn(null, name, type);
+      } else if (indexByName.containsKey(ancestors)) {
+        api.addColumn(ancestors, name, type);
+      }
+      // At this point the parent of this column hasn't been added to the 
schema, not yet visited

Review comment:
       I had a hard time following the code and ended up using a debugger to 
figure out it wasn't trying to add columns in missing structs. It was good to 
see that there are test cases for this, but I think we can avoid the need for 
calling `addColumn` when it should not be added by using the visitor's return 
values.
   
   I prototyped a change that returns a Boolean that indicates whether a column 
is missing. That way, each method checks whether its equivalent is in the other 
struct. If it is not, the method simply returns true because the whole type 
should be added. If it is present, then containers may update the type, docs, 
and required boolean. I think that's easier to follow, so I'll post the changes 
for you.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to