This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new e58d28d  Core: Add row identifier to schema (#2465)
e58d28d is described below

commit e58d28d500241e6a36a896993b56a80c49bf6118
Author: Jack Ye <[email protected]>
AuthorDate: Sun May 2 13:52:56 2021 -0700

    Core: Add row identifier to schema (#2465)
---
 api/src/main/java/org/apache/iceberg/Schema.java   |  79 ++++++-
 .../main/java/org/apache/iceberg/UpdateSchema.java |  22 ++
 .../main/java/org/apache/iceberg/SchemaParser.java |  24 ++-
 .../main/java/org/apache/iceberg/SchemaUpdate.java |  86 ++++++--
 .../java/org/apache/iceberg/TableMetadata.java     |   6 +-
 .../java/org/apache/iceberg/util/JsonUtil.java     |  84 +++++++-
 .../java/org/apache/iceberg/TestSchemaUpdate.java  | 235 +++++++++++++++++++++
 .../java/org/apache/iceberg/TestTableMetadata.java |  27 +++
 core/src/test/resources/TableMetadataV2Valid.json  |   6 +-
 9 files changed, 530 insertions(+), 39 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/Schema.java 
b/api/src/main/java/org/apache/iceberg/Schema.java
index f753a2b..1b80a2d 100644
--- a/api/src/main/java/org/apache/iceberg/Schema.java
+++ b/api/src/main/java/org/apache/iceberg/Schema.java
@@ -32,9 +32,12 @@ import 
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.BiMap;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableBiMap;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
 import org.apache.iceberg.types.Types.NestedField;
 import org.apache.iceberg.types.Types.StructType;
 
@@ -51,29 +54,47 @@ public class Schema implements Serializable {
 
   private final StructType struct;
   private final int schemaId;
+  private final int[] identifierFieldIds;
+
   private transient BiMap<String, Integer> aliasToId = null;
   private transient Map<Integer, NestedField> idToField = null;
   private transient Map<String, Integer> nameToId = null;
   private transient Map<String, Integer> lowerCaseNameToId = null;
   private transient Map<Integer, Accessor<StructLike>> idToAccessor = null;
   private transient Map<Integer, String> idToName = null;
+  private transient Set<Integer> identifierFieldIdSet = null;
 
   public Schema(List<NestedField> columns, Map<String, Integer> aliases) {
-    this.schemaId = DEFAULT_SCHEMA_ID;
-    this.struct = StructType.of(columns);
-    this.aliasToId = aliases != null ? ImmutableBiMap.copyOf(aliases) : null;
+    this(columns, aliases, ImmutableSet.of());
+  }
 
-    // validate the schema through IndexByName visitor
-    lazyIdToName();
+  public Schema(List<NestedField> columns, Map<String, Integer> aliases, 
Set<Integer> identifierFieldIds) {
+    this(DEFAULT_SCHEMA_ID, columns, aliases, identifierFieldIds);
   }
 
   public Schema(List<NestedField> columns) {
-    this(DEFAULT_SCHEMA_ID, columns);
+    this(columns, ImmutableSet.of());
+  }
+
+  public Schema(List<NestedField> columns, Set<Integer> identifierFieldIds) {
+    this(DEFAULT_SCHEMA_ID, columns, identifierFieldIds);
   }
 
   public Schema(int schemaId, List<NestedField> columns) {
+    this(schemaId, columns, ImmutableSet.of());
+  }
+
+  public Schema(int schemaId, List<NestedField> columns, Set<Integer> 
identifierFieldIds) {
+    this(schemaId, columns, null, identifierFieldIds);
+  }
+
+  private Schema(int schemaId, List<NestedField> columns, Map<String, Integer> 
aliases,
+                 Set<Integer> identifierFieldIds) {
     this.schemaId = schemaId;
     this.struct = StructType.of(columns);
+    this.aliasToId = aliases != null ? ImmutableBiMap.copyOf(aliases) : null;
+    this.identifierFieldIds = identifierFieldIds != null ? 
Ints.toArray(identifierFieldIds) : new int[0];
+
     lazyIdToName();
   }
 
@@ -120,6 +141,13 @@ public class Schema implements Serializable {
     return idToAccessor;
   }
 
+  private Set<Integer> lazyIdentifierFieldIdSet() {
+    if (identifierFieldIdSet == null) {
+      identifierFieldIdSet = 
ImmutableSet.copyOf(Ints.asList(identifierFieldIds));
+    }
+    return identifierFieldIdSet;
+  }
+
   /**
    * Returns the schema ID for this schema.
    * <p>
@@ -159,6 +187,29 @@ public class Schema implements Serializable {
   }
 
   /**
+   * The set of identifier field IDs.
+   * <p>
+   * Identifier is a concept similar to primary key in a relational database 
system.
+   * It consists of a unique set of primitive fields in the schema.
+   * An identifier field must be at root, or nested in a chain of structs (no 
maps or lists).
+   * A row should be unique in a table based on the values of the identifier 
fields.
+   * However, Iceberg identifier differs from primary key in the following 
ways:
+   * <ul>
+   * <li>Iceberg does not enforce the uniqueness of a row based on this 
identifier information.
+   * It is used for operations like upsert to define the default upsert 
key.</li>
+   * <li>NULL can be used as value of an identifier field. Iceberg ensures 
null-safe equality check.</li>
+   * <li>A nested field in a struct can be used as an identifier. For example, 
if there is a "last_name" field
+   * inside a "user" struct in a schema, field "user.last_name" can be set as 
a part of the identifier field.</li>
+   * </ul>
+   * <p>
+   *
+   * @return the set of identifier field IDs in this schema.
+   */
+  public Set<Integer> identifierFieldIds() {
+    return lazyIdentifierFieldIdSet();
+  }
+
+  /**
    * Returns the {@link Type} of a sub-field identified by the field name.
    *
    * @param name a field name
@@ -331,6 +382,16 @@ public class Schema implements Serializable {
     return internalSelect(names, false);
   }
 
+  /**
+   * Checks whether this schema is equivalent to another schema while ignoring 
the schema ID.
+   * @param anotherSchema another schema
+   * @return true if this schema is equivalent to the given schema
+   */
+  public boolean sameSchema(Schema anotherSchema) {
+    return asStruct().equals(anotherSchema.asStruct()) &&
+        identifierFieldIds().equals(anotherSchema.identifierFieldIds());
+  }
+
   private Schema internalSelect(Collection<String> names, boolean 
caseSensitive) {
     if (names.contains(ALL_COLUMNS)) {
       return this;
@@ -353,11 +414,15 @@ public class Schema implements Serializable {
     return TypeUtil.select(this, selected);
   }
 
+  private String identifierFieldToString(Types.NestedField field) {
+    return "  " + field + (identifierFieldIds().contains(field.fieldId()) ? " 
(id)" : "");
+  }
+
   @Override
   public String toString() {
     return String.format("table {\n%s\n}",
         NEWLINE.join(struct.fields().stream()
-            .map(f -> "  " + f)
+            .map(this::identifierFieldToString)
             .collect(Collectors.toList())));
   }
 }
diff --git a/api/src/main/java/org/apache/iceberg/UpdateSchema.java 
b/api/src/main/java/org/apache/iceberg/UpdateSchema.java
index b372758..32c3092 100644
--- a/api/src/main/java/org/apache/iceberg/UpdateSchema.java
+++ b/api/src/main/java/org/apache/iceberg/UpdateSchema.java
@@ -19,7 +19,9 @@
 
 package org.apache.iceberg;
 
+import java.util.Set;
 import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.Type;
 
 /**
@@ -384,4 +386,24 @@ public interface UpdateSchema extends 
PendingUpdate<Schema> {
    *                                  with other additions, renames, or 
updates.
    */
   UpdateSchema unionByNameWith(Schema newSchema);
+
+  /**
+   * Set the identifier fields given a set of field names.
+   * See {@link Schema#identifierFieldIds()} to learn more about Iceberg 
identifier.
+   *
+   * @param names names of the columns to set as identifier fields
+   * @return this for method chaining
+   */
+  UpdateSchema setIdentifierFields(Set<String> names);
+
+  /**
+   * Set the identifier fields given some field names.
+   * See {@link UpdateSchema#setIdentifierFields(Set)} for more details.
+   *
+   * @param names names of the columns to set as identifier fields
+   * @return this for method chaining
+   */
+  default UpdateSchema setIdentifierFields(String... names) {
+    return setIdentifierFields(Sets.newHashSet(names));
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/SchemaParser.java 
b/core/src/main/java/org/apache/iceberg/SchemaParser.java
index 534e085..0a4f6f3 100644
--- a/core/src/main/java/org/apache/iceberg/SchemaParser.java
+++ b/core/src/main/java/org/apache/iceberg/SchemaParser.java
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -40,6 +41,7 @@ public class SchemaParser {
   }
 
   private static final String SCHEMA_ID = "schema-id";
+  private static final String IDENTIFIER_FIELD_IDS = "identifier-field-ids";
   private static final String TYPE = "type";
   private static final String STRUCT = "struct";
   private static final String LIST = "list";
@@ -59,10 +61,11 @@ public class SchemaParser {
   private static final String VALUE_REQUIRED = "value-required";
 
   private static void toJson(Types.StructType struct, JsonGenerator generator) 
throws IOException {
-    toJson(struct, null, generator);
+    toJson(struct, null, null, generator);
   }
 
-  private static void toJson(Types.StructType struct, Integer schemaId, 
JsonGenerator generator) throws IOException {
+  private static void toJson(Types.StructType struct, Integer schemaId, 
Set<Integer> identifierFieldIds,
+                             JsonGenerator generator) throws IOException {
     generator.writeStartObject();
 
     generator.writeStringField(TYPE, STRUCT);
@@ -70,6 +73,14 @@ public class SchemaParser {
       generator.writeNumberField(SCHEMA_ID, schemaId);
     }
 
+    if (identifierFieldIds != null && !identifierFieldIds.isEmpty()) {
+      generator.writeArrayFieldStart(IDENTIFIER_FIELD_IDS);
+      for (int id : identifierFieldIds) {
+        generator.writeNumber(id);
+      }
+      generator.writeEndArray();
+    }
+
     generator.writeArrayFieldStart(FIELDS);
     for (Types.NestedField field : struct.fields()) {
       generator.writeStartObject();
@@ -144,7 +155,7 @@ public class SchemaParser {
   }
 
   public static void toJson(Schema schema, JsonGenerator generator) throws 
IOException {
-    toJson(schema.asStruct(), schema.schemaId(), generator);
+    toJson(schema.asStruct(), schema.schemaId(), schema.identifierFieldIds(), 
generator);
   }
 
   public static String toJson(Schema schema) {
@@ -158,7 +169,7 @@ public class SchemaParser {
       if (pretty) {
         generator.useDefaultPrettyPrinter();
       }
-      toJson(schema.asStruct(), generator);
+      toJson(schema.asStruct(), schema.schemaId(), 
schema.identifierFieldIds(), generator);
       generator.flush();
       return writer.toString();
 
@@ -247,11 +258,12 @@ public class SchemaParser {
     Preconditions.checkArgument(type.isNestedType() && 
type.asNestedType().isStructType(),
         "Cannot create schema, not a struct type: %s", type);
     Integer schemaId = JsonUtil.getIntOrNull(SCHEMA_ID, json);
+    Set<Integer> identifierFieldIds = 
JsonUtil.getIntegerSetOrNull(IDENTIFIER_FIELD_IDS, json);
 
     if (schemaId == null) {
-      return new Schema(type.asNestedType().asStructType().fields());
+      return new Schema(type.asNestedType().asStructType().fields(), 
identifierFieldIds);
     } else {
-      return new Schema(schemaId, type.asNestedType().asStructType().fields());
+      return new Schema(schemaId, type.asNestedType().asStructType().fields(), 
identifierFieldIds);
     }
   }
 
diff --git a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java 
b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java
index e4f2593..1fb21a0 100644
--- a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java
+++ b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java
@@ -24,6 +24,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.iceberg.mapping.MappingUtil;
 import org.apache.iceberg.mapping.NameMapping;
 import org.apache.iceberg.mapping.NameMappingParser;
@@ -33,6 +35,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Multimap;
 import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.schema.UnionByNameVisitor;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.TypeUtil;
@@ -59,25 +62,32 @@ class SchemaUpdate implements UpdateSchema {
   private final Multimap<Integer, Move> moves = 
Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList);
   private int lastColumnId;
   private boolean allowIncompatibleChanges = false;
-
+  private Set<String> identifierFieldNames;
 
   SchemaUpdate(TableOperations ops) {
-    this.ops = ops;
-    this.base = ops.current();
-    this.schema = base.schema();
-    this.lastColumnId = base.lastColumnId();
-    this.idToParent = 
Maps.newHashMap(TypeUtil.indexParents(schema.asStruct()));
+    this(ops, ops.current());
   }
 
   /**
    * For testing only.
    */
   SchemaUpdate(Schema schema, int lastColumnId) {
-    this.ops = null;
-    this.base = null;
+    this(null, null, schema, lastColumnId);
+  }
+
+  private SchemaUpdate(TableOperations ops, TableMetadata base) {
+    this(ops, base, base.schema(), base.lastColumnId());
+  }
+
+  private SchemaUpdate(TableOperations ops, TableMetadata base, Schema schema, 
int lastColumnId) {
+    this.ops = ops;
+    this.base = base;
     this.schema = schema;
     this.lastColumnId = lastColumnId;
     this.idToParent = 
Maps.newHashMap(TypeUtil.indexParents(schema.asStruct()));
+    this.identifierFieldNames = schema.identifierFieldIds().stream()
+        .map(id -> schema.findField(id).name())
+        .collect(Collectors.toSet());
   }
 
   @Override
@@ -170,7 +180,6 @@ class SchemaUpdate implements UpdateSchema {
         "Cannot delete a column that has additions: %s", name);
     Preconditions.checkArgument(!updates.containsKey(field.fieldId()),
         "Cannot delete a column that has updates: %s", name);
-
     deletes.add(field.fieldId());
 
     return this;
@@ -193,6 +202,11 @@ class SchemaUpdate implements UpdateSchema {
       updates.put(fieldId, Types.NestedField.of(fieldId, field.isOptional(), 
newName, field.type(), field.doc()));
     }
 
+    if (identifierFieldNames.contains(name)) {
+      identifierFieldNames.remove(name);
+      identifierFieldNames.add(newName);
+    }
+
     return this;
   }
 
@@ -317,6 +331,12 @@ class SchemaUpdate implements UpdateSchema {
     return this;
   }
 
+  @Override
+  public UpdateSchema setIdentifierFields(Set<String> names) {
+    this.identifierFieldNames = Sets.newHashSet(names);
+    return this;
+  }
+
   private Integer findForMove(String name) {
     Types.NestedField field = schema.findField(name);
     if (field != null) {
@@ -359,7 +379,7 @@ class SchemaUpdate implements UpdateSchema {
    */
   @Override
   public Schema apply() {
-    Schema newSchema = applyChanges(schema, deletes, updates, adds, moves);
+    Schema newSchema = applyChanges(schema, deletes, updates, adds, moves, 
identifierFieldNames);
 
     // Validate the metrics if we have existing properties.
     if (base != null && base.properties() != null) {
@@ -408,11 +428,53 @@ class SchemaUpdate implements UpdateSchema {
   private static Schema applyChanges(Schema schema, List<Integer> deletes,
                                      Map<Integer, Types.NestedField> updates,
                                      Multimap<Integer, Types.NestedField> adds,
-                                     Multimap<Integer, Move> moves) {
+                                     Multimap<Integer, Move> moves,
+                                     Set<String> identifierFieldNames) {
+    // validate existing identifier fields are not deleted
+    for (String name : identifierFieldNames) {
+      Types.NestedField field = schema.findField(name);
+      if (field != null) {
+        Preconditions.checkArgument(!deletes.contains(field.fieldId()),
+            "Cannot delete identifier field %s. To force deletion, " +
+                "also call setIdentifierFields to update identifier fields.", 
field);
+      }
+    }
+
+    // apply schema changes
     Types.StructType struct = TypeUtil
         .visit(schema, new ApplyChanges(deletes, updates, adds, moves))
         .asNestedType().asStructType();
-    return new Schema(struct.fields());
+
+    // validate identifier requirements based on the latest schema
+    Map<String, Integer> nameToId = TypeUtil.indexByName(struct);
+    Set<Integer> freshIdentifierFieldIds = Sets.newHashSet();
+    for (String name : identifierFieldNames) {
+      Preconditions.checkArgument(nameToId.containsKey(name),
+          "Cannot add field %s as an identifier field: not found in current 
schema or added columns");
+      freshIdentifierFieldIds.add(nameToId.get(name));
+    }
+
+    Map<Integer, Integer> idToParent = 
TypeUtil.indexParents(schema.asStruct());
+    Map<Integer, Types.NestedField> idToField = TypeUtil.indexById(struct);
+    freshIdentifierFieldIds.forEach(id -> validateIdentifierField(id, 
idToField, idToParent));
+
+    return new Schema(struct.fields(), freshIdentifierFieldIds);
+  }
+
+  private static void validateIdentifierField(int fieldId, Map<Integer, 
Types.NestedField> idToField,
+                                              Map<Integer, Integer> 
idToParent) {
+    Types.NestedField field = idToField.get(fieldId);
+    Preconditions.checkArgument(field.type().isPrimitiveType(),
+        "Cannot add field %s as an identifier field: not a primitive type 
field", field.name());
+
+    // check whether the nested field is in a chain of struct fields
+    Integer parentId = idToParent.get(field.fieldId());
+    while (parentId != null) {
+      Types.NestedField parent = idToField.get(parentId);
+      Preconditions.checkArgument(parent.type().isStructType(),
+          "Cannot add field %s as an identifier field: must not be nested in 
%s", field.name(), parent);
+      parentId = idToParent.get(parent.fieldId());
+    }
   }
 
   private static class ApplyChanges extends TypeUtil.SchemaVisitor<Type> {
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java 
b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index 1e33c54..532519e 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -491,7 +491,7 @@ public class TableMetadata implements Serializable {
 
     ImmutableList.Builder<Schema> builder = 
ImmutableList.<Schema>builder().addAll(schemas);
     if (!schemasById.containsKey(newSchemaId)) {
-      builder.add(new Schema(newSchemaId, newSchema.columns()));
+      builder.add(new Schema(newSchemaId, newSchema.columns(), 
newSchema.identifierFieldIds()));
     }
 
     return new TableMetadata(null, formatVersion, uuid, location,
@@ -760,7 +760,7 @@ public class TableMetadata implements Serializable {
     ImmutableList.Builder<Schema> schemasBuilder = 
ImmutableList.<Schema>builder().addAll(schemas);
 
     if (!schemasById.containsKey(freshSchemaId)) {
-      schemasBuilder.add(new Schema(freshSchemaId, freshSchema.columns()));
+      schemasBuilder.add(new Schema(freshSchemaId, freshSchema.columns(), 
freshSchema.identifierFieldIds()));
     }
 
     return new TableMetadata(null, formatVersion, uuid, newLocation,
@@ -916,7 +916,7 @@ public class TableMetadata implements Serializable {
     // if the schema already exists, use its id; otherwise use the highest id 
+ 1
     int newSchemaId = currentSchemaId;
     for (Schema schema : schemas) {
-      if (schema.asStruct().equals(newSchema.asStruct())) {
+      if (schema.sameSchema(newSchema)) {
         newSchemaId = schema.schemaId();
         break;
       } else if (schema.schemaId() >= newSchemaId) {
diff --git a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java 
b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java
index 99b82a4..bcde959 100644
--- a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java
@@ -25,9 +25,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 
 public class JsonUtil {
 
@@ -117,18 +119,80 @@ public class JsonUtil {
 
   public static List<String> getStringList(String property, JsonNode node) {
     Preconditions.checkArgument(node.has(property), "Cannot parse missing list 
%s", property);
-    JsonNode pNode = node.get(property);
-    Preconditions.checkArgument(pNode != null && !pNode.isNull() && 
pNode.isArray(),
-        "Cannot parse %s from non-array value: %s", property, pNode);
+    return ImmutableList.<String>builder()
+        .addAll(new JsonStringArrayIterator(property, node))
+        .build();
+  }
+
+  public static Set<Integer> getIntegerSetOrNull(String property, JsonNode 
node) {
+    if (!node.has(property)) {
+      return null;
+    }
+
+    return ImmutableSet.<Integer>builder()
+        .addAll(new JsonIntegerArrayIterator(property, node))
+        .build();
+  }
+
+  abstract static class JsonArrayIterator<T> implements Iterator<T> {
 
-    ImmutableList.Builder<String> builder = ImmutableList.builder();
-    Iterator<JsonNode> elements = pNode.elements();
-    while (elements.hasNext()) {
+    private final Iterator<JsonNode> elements;
+
+    JsonArrayIterator(String property, JsonNode node) {
+      JsonNode pNode = node.get(property);
+      Preconditions.checkArgument(pNode != null && !pNode.isNull() && 
pNode.isArray(),
+          "Cannot parse %s from non-array value: %s", property, pNode);
+      this.elements = pNode.elements();
+    }
+
+    @Override
+    public boolean hasNext() {
+      return elements.hasNext();
+    }
+
+    @Override
+    public T next() {
       JsonNode element = elements.next();
-      Preconditions.checkArgument(element.isTextual(),
-          "Cannot parse string from non-text value: %s", element);
-      builder.add(element.asText());
+      validate(element);
+      return convert(element);
+    }
+
+    abstract T convert(JsonNode element);
+
+    abstract void validate(JsonNode element);
+  }
+
+  static class JsonStringArrayIterator extends JsonArrayIterator<String> {
+
+    JsonStringArrayIterator(String property, JsonNode node) {
+      super(property, node);
+    }
+
+    @Override
+    String convert(JsonNode element) {
+      return element.asText();
+    }
+
+    @Override
+    void validate(JsonNode element) {
+      Preconditions.checkArgument(element.isTextual(), "Cannot parse string 
from non-text value: %s", element);
+    }
+  }
+
+  static class JsonIntegerArrayIterator extends JsonArrayIterator<Integer> {
+
+    JsonIntegerArrayIterator(String property, JsonNode node) {
+      super(property, node);
+    }
+
+    @Override
+    Integer convert(JsonNode element) {
+      return element.asInt();
+    }
+
+    @Override
+    void validate(JsonNode element) {
+      Preconditions.checkArgument(element.isInt(), "Cannot parse integer from 
non-int value: %s", element);
     }
-    return builder.build();
   }
 }
diff --git a/core/src/test/java/org/apache/iceberg/TestSchemaUpdate.java 
b/core/src/test/java/org/apache/iceberg/TestSchemaUpdate.java
index ecd29b8..6b8380e 100644
--- a/core/src/test/java/org/apache/iceberg/TestSchemaUpdate.java
+++ b/core/src/test/java/org/apache/iceberg/TestSchemaUpdate.java
@@ -1223,4 +1223,239 @@ public class TestSchemaUpdate {
                 .moveBefore("s2.x", "s1.a")
                 .apply());
   }
+
+  @Test
+  public void testAddExistingIdentifierFields() {
+    Schema newSchema = new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID)
+        .setIdentifierFields("id")
+        .apply();
+
+    Assert.assertEquals("add an existing field as identifier field should 
succeed",
+        Sets.newHashSet(newSchema.findField("id").fieldId()),
+        newSchema.identifierFieldIds());
+  }
+
+  @Test
+  public void testAddNewIdentifierFieldColumns() {
+    Schema newSchema = new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID)
+        .addColumn("new_field", Types.StringType.get())
+        .setIdentifierFields("id", "new_field")
+        .apply();
+
+    Assert.assertEquals("add column then set as identifier should succeed",
+        Sets.newHashSet(newSchema.findField("id").fieldId(), 
newSchema.findField("new_field").fieldId()),
+        newSchema.identifierFieldIds());
+
+    newSchema = new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID)
+        .setIdentifierFields("id", "new_field")
+        .addColumn("new_field", Types.StringType.get())
+        .apply();
+
+    Assert.assertEquals("set identifier then add column should succeed",
+        Sets.newHashSet(newSchema.findField("id").fieldId(), 
newSchema.findField("new_field").fieldId()),
+        newSchema.identifierFieldIds());
+  }
+
+  @Test
+  public void testAddNestedIdentifierFieldColumns() {
+    Schema newSchema = new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID)
+        .setIdentifierFields("preferences.feature1")
+        .apply();
+
+    Assert.assertEquals("set existing nested field as identifier should 
succeed",
+        Sets.newHashSet(newSchema.findField("preferences.feature1").fieldId()),
+        newSchema.identifierFieldIds());
+
+    newSchema = new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID)
+        .addColumn("new", Types.StructType.of(
+            Types.NestedField.optional(SCHEMA_LAST_COLUMN_ID + 1, "field", 
Types.StringType.get())
+        ))
+        .setIdentifierFields("new.field")
+        .apply();
+
+    Assert.assertEquals("set newly added nested field as identifier should 
succeed",
+        Sets.newHashSet(newSchema.findField("new.field").fieldId()),
+        newSchema.identifierFieldIds());
+
+    newSchema = new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID)
+        .addColumn("new", Types.StructType.of(
+            Types.NestedField.optional(SCHEMA_LAST_COLUMN_ID + 1, "field", 
Types.StructType.of(
+                Types.NestedField.optional(SCHEMA_LAST_COLUMN_ID + 2, 
"nested", Types.StringType.get())))))
+        .setIdentifierFields("new.field.nested")
+        .apply();
+
+    Assert.assertEquals("set newly added multi-layer nested field as 
identifier should succeed",
+        Sets.newHashSet(newSchema.findField("new.field.nested").fieldId()),
+        newSchema.identifierFieldIds());
+  }
+
+  @Test
+  public void testAddDottedIdentifierFieldColumns() {
+    Schema newSchema = new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID)
+        .addColumn(null, "dot.field", Types.StringType.get())
+        .setIdentifierFields("id", "dot.field")
+        .apply();
+
+    Assert.assertEquals("add a field with dot as identifier should succeed",
+        Sets.newHashSet(newSchema.findField("id").fieldId(), 
newSchema.findField("dot.field").fieldId()),
+        newSchema.identifierFieldIds());
+  }
+
+  @Test
+  public void testRemoveIdentifierFields() {
+    Schema newSchema = new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID)
+        .addColumn("new_field", Types.StringType.get())
+        .addColumn("new_field2", Types.StringType.get())
+        .setIdentifierFields("id", "new_field", "new_field2")
+        .apply();
+
+    newSchema = new SchemaUpdate(newSchema, SCHEMA_LAST_COLUMN_ID)
+        .setIdentifierFields("new_field", "new_field2")
+        .apply();
+
+    Assert.assertEquals("remove an identifier field should succeed",
+        Sets.newHashSet(newSchema.findField("new_field").fieldId(), 
newSchema.findField("new_field2").fieldId()),
+        newSchema.identifierFieldIds());
+
+    newSchema = new SchemaUpdate(newSchema, SCHEMA_LAST_COLUMN_ID)
+        .setIdentifierFields(Sets.newHashSet())
+        .apply();
+
+    Assert.assertEquals("remove all identifier fields should succeed",
+        Sets.newHashSet(),
+        newSchema.identifierFieldIds());
+  }
+
+  @Test
+  public void testSetIdentifierFieldsFails() {
+    AssertHelpers.assertThrows("add a field with name not exist should fail",
+        IllegalArgumentException.class,
+        "not found in current schema or added columns",
+        () -> new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID)
+            .setIdentifierFields("unknown")
+            .apply());
+
+    AssertHelpers.assertThrows("add a field of non-primitive type should fail",
+        IllegalArgumentException.class,
+        "not a primitive type field",
+        () -> new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID)
+            .setIdentifierFields("locations")
+            .apply());
+
+    AssertHelpers.assertThrows("add a map key nested field should fail",
+        IllegalArgumentException.class,
+        "must not be nested in " + SCHEMA.findField("locations"),
+        () -> new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID)
+            .setIdentifierFields("locations.key.zip")
+            .apply());
+
+    AssertHelpers.assertThrows("add a map value nested field should fail",
+        IllegalArgumentException.class,
+        "must not be nested in " + SCHEMA.findField("locations"),
+        () -> new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID)
+            .setIdentifierFields("locations.value.lat")
+            .apply());
+
+    AssertHelpers.assertThrows("add a nested field in list should fail",
+        IllegalArgumentException.class,
+        "must not be nested in " + SCHEMA.findField("points"),
+        () -> new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID)
+            .setIdentifierFields("points.element.x")
+            .apply());
+
+    Schema newSchema = new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID)
+        .addColumn("new", Types.StructType.of(
+            Types.NestedField.optional(SCHEMA_LAST_COLUMN_ID + 1, "fields", 
Types.ListType.ofOptional(
+                SCHEMA_LAST_COLUMN_ID + 2, Types.StructType.of(
+                    Types.NestedField.optional(SCHEMA_LAST_COLUMN_ID + 3, 
"nested", Types.StringType.get())
+                ))
+            )
+        ))
+        .apply();
+
+    AssertHelpers.assertThrows("add a nested field in struct of a map should 
fail",
+        IllegalArgumentException.class,
+        "must not be nested in " + newSchema.findField("new.fields"),
+        () -> new SchemaUpdate(newSchema, SCHEMA_LAST_COLUMN_ID + 3)
+            .setIdentifierFields("new.fields.element.nested")
+            .apply());
+  }
+
+  @Test
+  public void testDeleteIdentifierFieldColumns() {
+    Schema schemaWithIdentifierFields = new SchemaUpdate(SCHEMA, 
SCHEMA_LAST_COLUMN_ID)
+        .setIdentifierFields("id")
+        .apply();
+
+    Assert.assertEquals("delete column and then reset identifier field should 
succeed",
+        Sets.newHashSet(),
+        new SchemaUpdate(schemaWithIdentifierFields, SCHEMA_LAST_COLUMN_ID)
+            .deleteColumn("id").setIdentifierFields(Sets.newHashSet()).apply()
+            .identifierFieldIds());
+
+    Assert.assertEquals("delete reset identifier field and then delete column 
should succeed",
+        Sets.newHashSet(),
+        new SchemaUpdate(schemaWithIdentifierFields, SCHEMA_LAST_COLUMN_ID)
+            .setIdentifierFields(Sets.newHashSet()).deleteColumn("id").apply()
+            .identifierFieldIds());
+  }
+
+  @Test
+  public void testDeleteIdentifierFieldColumnsFails() {
+    Schema schemaWithIdentifierFields = new SchemaUpdate(SCHEMA, 
SCHEMA_LAST_COLUMN_ID)
+        .setIdentifierFields("id")
+        .apply();
+
+    AssertHelpers.assertThrows("delete an identifier column without setting 
identifier fields should fail",
+        IllegalArgumentException.class,
+        "Cannot delete identifier field 1: id: required int. To force 
deletion, " +
+            "also call setIdentifierFields to update identifier fields.",
+        () -> new SchemaUpdate(schemaWithIdentifierFields, 
SCHEMA_LAST_COLUMN_ID).deleteColumn("id").apply());
+  }
+
+  @Test
+  public void testRenameIdentifierFields() {
+    Schema schemaWithIdentifierFields = new SchemaUpdate(SCHEMA, 
SCHEMA_LAST_COLUMN_ID)
+        .setIdentifierFields("id")
+        .apply();
+
+    Schema newSchema = new SchemaUpdate(schemaWithIdentifierFields, 
SCHEMA_LAST_COLUMN_ID)
+        .renameColumn("id", "id2")
+        .apply();
+
+    Assert.assertEquals("rename should not affect identifier fields",
+        Sets.newHashSet(SCHEMA.findField("id").fieldId()),
+        newSchema.identifierFieldIds());
+  }
+
+  @Test
+  public void testMoveIdentifierFields() {
+    Schema schemaWithIdentifierFields = new SchemaUpdate(SCHEMA, 
SCHEMA_LAST_COLUMN_ID)
+        .setIdentifierFields("id")
+        .apply();
+
+    Schema newSchema = new SchemaUpdate(schemaWithIdentifierFields, 
SCHEMA_LAST_COLUMN_ID)
+        .moveAfter("id", "locations")
+        .apply();
+
+    Assert.assertEquals("move after should not affect identifier fields",
+        Sets.newHashSet(SCHEMA.findField("id").fieldId()),
+        newSchema.identifierFieldIds());
+
+    newSchema = new SchemaUpdate(schemaWithIdentifierFields, 
SCHEMA_LAST_COLUMN_ID)
+        .moveBefore("id", "locations")
+        .apply();
+
+    Assert.assertEquals("move before should not affect identifier fields",
+        Sets.newHashSet(SCHEMA.findField("id").fieldId()),
+        newSchema.identifierFieldIds());
+
+    newSchema = new SchemaUpdate(schemaWithIdentifierFields, 
SCHEMA_LAST_COLUMN_ID)
+        .moveFirst("id")
+        .apply();
+
+    Assert.assertEquals("move first should not affect identifier fields",
+        Sets.newHashSet(SCHEMA.findField("id").fieldId()),
+        newSchema.identifierFieldIds());
+  }
 }
diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java 
b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
index a7b30a6..c44ca63 100644
--- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
+++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
@@ -647,6 +647,33 @@ public class TestTableMetadata {
   }
 
   @Test
+  public void testParseSchemaIdentifierFields() throws Exception {
+    String data = readTableMetadataInputFile("TableMetadataV2Valid.json");
+    TableMetadata parsed = TableMetadataParser.fromJson(
+        ops.io(), null, JsonUtil.mapper().readValue(data, JsonNode.class));
+    Assert.assertEquals(Sets.newHashSet(), 
parsed.schemasById().get(0).identifierFieldIds());
+    Assert.assertEquals(Sets.newHashSet(1, 2), 
parsed.schemasById().get(1).identifierFieldIds());
+  }
+
+  @Test
+  public void testUpdateSchemaIdentifierFields() {
+    Schema schema = new Schema(
+        Types.NestedField.required(10, "x", Types.StringType.get())
+    );
+
+    TableMetadata meta = TableMetadata.newTableMetadata(
+        schema, PartitionSpec.unpartitioned(), null, ImmutableMap.of());
+
+    Schema newSchema = new Schema(
+        Lists.newArrayList(Types.NestedField.required(1, "x", 
Types.StringType.get())),
+        Sets.newHashSet(1)
+    );
+    TableMetadata newMeta = meta.updateSchema(newSchema, 1);
+    Assert.assertEquals(2, newMeta.schemas().size());
+    Assert.assertEquals(Sets.newHashSet(1), 
newMeta.schema().identifierFieldIds());
+  }
+
+  @Test
   public void testUpdateSchema() {
     Schema schema = new Schema(0,
         Types.NestedField.required(1, "y", Types.LongType.get(), "comment")
diff --git a/core/src/test/resources/TableMetadataV2Valid.json 
b/core/src/test/resources/TableMetadataV2Valid.json
index cf492f5..d43e0a2 100644
--- a/core/src/test/resources/TableMetadataV2Valid.json
+++ b/core/src/test/resources/TableMetadataV2Valid.json
@@ -22,6 +22,10 @@
     {
       "type": "struct",
       "schema-id": 1,
+      "identifier-field-ids": [
+        1,
+        2
+      ],
       "fields": [
         {
           "id": 1,
@@ -85,4 +89,4 @@
   "snapshots": [],
   "snapshot-log": [],
   "metadata-log": []
-}
\ No newline at end of file
+}

Reply via email to