fbocse commented on a change in pull request #1177:
URL: https://github.com/apache/iceberg/pull/1177#discussion_r482061859
##########
File path: core/src/main/java/org/apache/iceberg/SchemaUpdate.java
##########
@@ -308,6 +314,162 @@ public UpdateSchema moveAfter(String name, String
afterName) {
return this;
}
+ @Override
+ public UpdateSchema upsertSchema(Schema newSchema) {
+ TypeUtil.visit(newSchema, new ApplyUpdates(this, schema));
+ return this;
+ }
+
+ private static class ApplyUpdates extends TypeUtil.SchemaVisitor<Void> {
+ private static final Joiner DOT = Joiner.on(".");
+ private final Deque<String> fieldNames = Lists.newLinkedList();
+ private NestedField currentField = null;
+
+ private final Schema baseSchema;
+ private final UpdateSchema api;
+ private final Map<String, Integer> indexByName;
+
+ private ApplyUpdates(UpdateSchema api, Schema baseSchema) {
+ this.api = api;
+ this.baseSchema = baseSchema;
+ this.indexByName = TypeUtil.indexByName(baseSchema.asStruct());
+ }
+
+ @Override
+ public void beforeListElement(NestedField elementField) {
+ beforeField(elementField);
+ }
+
+ @Override
+ public void afterListElement(NestedField elementField) {
+ afterField(elementField);
+ }
+
+ @Override
+ public void beforeMapKey(Types.NestedField keyField) {
+ beforeField(keyField);
+ }
+
+ @Override
+ public void afterMapKey(Types.NestedField keyField) {
+ afterField(keyField);
+ }
+
+ @Override
+ public void beforeMapValue(Types.NestedField valueField) {
+ beforeField(valueField);
+ }
+
+ @Override
+ public void afterMapValue(Types.NestedField valueField) {
+ afterField(valueField);
+ }
+
+ @Override
+ public void beforeField(NestedField field) {
+ fieldNames.push(field.name()); // we don't expect `element` to show up -
it breaks
+ currentField = field;
+ }
+
+ @Override
+ public void afterField(NestedField field) {
+ fieldNames.pop();
+ }
+
+ @Override
+ public Void field(NestedField field, Void fieldResult) {
+ return super.field(field, fieldResult);
+ }
+
+ @Override
+ public Void list(ListType list, Void elementResult) {
+ String fullName = DOT.join(fieldNames.descendingIterator());
+ Types.NestedField field = baseSchema.findField(fullName);
+ if (field == null) {
+ addColumn(fieldNames.peekFirst(), Types.ListType.ofOptional(0,
list.elementType()), ancestors());
+ } else if (!field.type().isListType()) {
+ throw new IllegalArgumentException(
+ "Cannot update existing field: " + fullName + " of type: " + field
+ .type() + " to type list");
+ }
+ return null;
+ }
+
+ @Override
+ public Void map(MapType map, Void keyResult, Void valueResult) {
+ String fullName = DOT.join(fieldNames.descendingIterator());
+ Types.NestedField field = baseSchema.findField(fullName);
+ if (field == null) {
+ addColumn(fieldNames.peekFirst(), Types.MapType.ofOptional(0, 1,
map.keyType(), map.valueType()), ancestors());
+ } else if (!field.type().isMapType()) {
+ throw new IllegalArgumentException(
+ "Cannot update existing field: " + fullName + " of type: " + field
+ .type() + " to type map");
+ }
+ return null;
+ }
+
+ @Override
+ public Void struct(Types.StructType struct, List<Void> fieldResults) {
+ if (fieldNames.isEmpty()) {
+ return null; // this is the root struct
+ }
+ String fullName = DOT.join(fieldNames.descendingIterator());
+ Types.NestedField field = baseSchema.findField(fullName);
+ if (field == null) {
+ addColumn(fieldNames.peekFirst(),
Types.StructType.of(struct.fields()), ancestors());
+ } else if (!field.type().isStructType()) {
+ throw new IllegalArgumentException("Cannot update existing field: " +
fullName + " of type: " + field.type() +
+ " to type struct");
+ }
+ return null;
+ }
+
+ @Override
+ public Void primitive(PrimitiveType primitive) {
+ String fullName = DOT.join(fieldNames.descendingIterator());
+ Types.NestedField field = baseSchema.findField(fullName);
+ PrimitiveType newFieldType =
Types.fromPrimitiveString(primitive.toString());
+ if (field == null) {
+ addColumn(currentField.name(),
Types.fromPrimitiveString(primitive.toString()), ancestors());
+ } else if (!field.type().isPrimitiveType()) {
+ throw new IllegalArgumentException("Cannot update existing field: " +
field.name() + " of type: " +
+ field.type() + " to primitive type: " +
primitive.typeId().name());
+ } else if (!newFieldType.equals(field.type())) {
+ updateColumn(field.type().asPrimitiveType(), fullName, field.doc(),
newFieldType, currentField.doc());
+ }
+ return null;
+ }
+
+ private String ancestors() {
+ if (fieldNames.isEmpty()) {
+ return "";
+ }
+ String head = fieldNames.removeFirst();
+ String join = DOT.join(fieldNames.descendingIterator());
+ fieldNames.addFirst(head);
+ return join;
+ }
+
+ private void updateColumn(PrimitiveType fieldType, String fullName, String
fieldDoc, PrimitiveType newFieldType,
+ String newDoc) {
+ if (!fieldType.equals(newFieldType)) {
+ api.updateColumn(fullName, newFieldType.asPrimitiveType());
+ } else if (newDoc != null && !newDoc.equals(fieldDoc)) {
+ api.updateColumnDoc(fullName, newDoc);
+ }
+ }
+
+ private void addColumn(String name, Type type, String ancestors) {
+ if (ancestors.isEmpty()) {
+ api.addColumn(null, name, type);
+ } else if (indexByName.containsKey(ancestors)) {
+ api.addColumn(ancestors, name, type);
+ }
+ // At this point the parent of this column hasn't been added to the
schema, not yet visited
Review comment:
Indeed, this aspect of the implementation will greatly improve the
efficiency for the union of distinct schemas.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]