This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new e58d28d Core: Add row identifier to schema (#2465)
e58d28d is described below
commit e58d28d500241e6a36a896993b56a80c49bf6118
Author: Jack Ye <[email protected]>
AuthorDate: Sun May 2 13:52:56 2021 -0700
Core: Add row identifier to schema (#2465)
---
api/src/main/java/org/apache/iceberg/Schema.java | 79 ++++++-
.../main/java/org/apache/iceberg/UpdateSchema.java | 22 ++
.../main/java/org/apache/iceberg/SchemaParser.java | 24 ++-
.../main/java/org/apache/iceberg/SchemaUpdate.java | 86 ++++++--
.../java/org/apache/iceberg/TableMetadata.java | 6 +-
.../java/org/apache/iceberg/util/JsonUtil.java | 84 +++++++-
.../java/org/apache/iceberg/TestSchemaUpdate.java | 235 +++++++++++++++++++++
.../java/org/apache/iceberg/TestTableMetadata.java | 27 +++
core/src/test/resources/TableMetadataV2Valid.json | 6 +-
9 files changed, 530 insertions(+), 39 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/Schema.java
b/api/src/main/java/org/apache/iceberg/Schema.java
index f753a2b..1b80a2d 100644
--- a/api/src/main/java/org/apache/iceberg/Schema.java
+++ b/api/src/main/java/org/apache/iceberg/Schema.java
@@ -32,9 +32,12 @@ import
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.BiMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableBiMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StructType;
@@ -51,29 +54,47 @@ public class Schema implements Serializable {
private final StructType struct;
private final int schemaId;
+ private final int[] identifierFieldIds;
+
private transient BiMap<String, Integer> aliasToId = null;
private transient Map<Integer, NestedField> idToField = null;
private transient Map<String, Integer> nameToId = null;
private transient Map<String, Integer> lowerCaseNameToId = null;
private transient Map<Integer, Accessor<StructLike>> idToAccessor = null;
private transient Map<Integer, String> idToName = null;
+ private transient Set<Integer> identifierFieldIdSet = null;
public Schema(List<NestedField> columns, Map<String, Integer> aliases) {
- this.schemaId = DEFAULT_SCHEMA_ID;
- this.struct = StructType.of(columns);
- this.aliasToId = aliases != null ? ImmutableBiMap.copyOf(aliases) : null;
+ this(columns, aliases, ImmutableSet.of());
+ }
- // validate the schema through IndexByName visitor
- lazyIdToName();
+ public Schema(List<NestedField> columns, Map<String, Integer> aliases,
Set<Integer> identifierFieldIds) {
+ this(DEFAULT_SCHEMA_ID, columns, aliases, identifierFieldIds);
}
public Schema(List<NestedField> columns) {
- this(DEFAULT_SCHEMA_ID, columns);
+ this(columns, ImmutableSet.of());
+ }
+
+ public Schema(List<NestedField> columns, Set<Integer> identifierFieldIds) {
+ this(DEFAULT_SCHEMA_ID, columns, identifierFieldIds);
}
public Schema(int schemaId, List<NestedField> columns) {
+ this(schemaId, columns, ImmutableSet.of());
+ }
+
+ public Schema(int schemaId, List<NestedField> columns, Set<Integer>
identifierFieldIds) {
+ this(schemaId, columns, null, identifierFieldIds);
+ }
+
+ private Schema(int schemaId, List<NestedField> columns, Map<String, Integer>
aliases,
+ Set<Integer> identifierFieldIds) {
this.schemaId = schemaId;
this.struct = StructType.of(columns);
+ this.aliasToId = aliases != null ? ImmutableBiMap.copyOf(aliases) : null;
+ this.identifierFieldIds = identifierFieldIds != null ?
Ints.toArray(identifierFieldIds) : new int[0];
+
lazyIdToName();
}
@@ -120,6 +141,13 @@ public class Schema implements Serializable {
return idToAccessor;
}
+ private Set<Integer> lazyIdentifierFieldIdSet() {
+ if (identifierFieldIdSet == null) {
+ identifierFieldIdSet =
ImmutableSet.copyOf(Ints.asList(identifierFieldIds));
+ }
+ return identifierFieldIdSet;
+ }
+
/**
* Returns the schema ID for this schema.
* <p>
@@ -159,6 +187,29 @@ public class Schema implements Serializable {
}
/**
+ * The set of identifier field IDs.
+ * <p>
+ * Identifier is a concept similar to primary key in a relational database
system.
+ * It consists of a unique set of primitive fields in the schema.
+ * An identifier field must be at root, or nested in a chain of structs (no
maps or lists).
+ * A row should be unique in a table based on the values of the identifier
fields.
+ * However, Iceberg identifier differs from primary key in the following
ways:
+ * <ul>
+ * <li>Iceberg does not enforce the uniqueness of a row based on this
identifier information.
+ * It is used for operations like upsert to define the default upsert
key.</li>
+ * <li>NULL can be used as value of an identifier field. Iceberg ensures
null-safe equality check.</li>
+ * <li>A nested field in a struct can be used as an identifier. For example,
if there is a "last_name" field
+ * inside a "user" struct in a schema, field "user.last_name" can be set as
a part of the identifier field.</li>
+ * </ul>
+ * <p>
+ *
+ * @return the set of identifier field IDs in this schema.
+ */
+ public Set<Integer> identifierFieldIds() {
+ return lazyIdentifierFieldIdSet();
+ }
+
+ /**
* Returns the {@link Type} of a sub-field identified by the field name.
*
* @param name a field name
@@ -331,6 +382,16 @@ public class Schema implements Serializable {
return internalSelect(names, false);
}
+ /**
+ * Checks whether this schema is equivalent to another schema while ignoring
the schema ID.
+ * @param anotherSchema another schema
+ * @return true if this schema is equivalent to the given schema
+ */
+ public boolean sameSchema(Schema anotherSchema) {
+ return asStruct().equals(anotherSchema.asStruct()) &&
+ identifierFieldIds().equals(anotherSchema.identifierFieldIds());
+ }
+
private Schema internalSelect(Collection<String> names, boolean
caseSensitive) {
if (names.contains(ALL_COLUMNS)) {
return this;
@@ -353,11 +414,15 @@ public class Schema implements Serializable {
return TypeUtil.select(this, selected);
}
+ private String identifierFieldToString(Types.NestedField field) {
+ return " " + field + (identifierFieldIds().contains(field.fieldId()) ? "
(id)" : "");
+ }
+
@Override
public String toString() {
return String.format("table {\n%s\n}",
NEWLINE.join(struct.fields().stream()
- .map(f -> " " + f)
+ .map(this::identifierFieldToString)
.collect(Collectors.toList())));
}
}
diff --git a/api/src/main/java/org/apache/iceberg/UpdateSchema.java
b/api/src/main/java/org/apache/iceberg/UpdateSchema.java
index b372758..32c3092 100644
--- a/api/src/main/java/org/apache/iceberg/UpdateSchema.java
+++ b/api/src/main/java/org/apache/iceberg/UpdateSchema.java
@@ -19,7 +19,9 @@
package org.apache.iceberg;
+import java.util.Set;
import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Type;
/**
@@ -384,4 +386,24 @@ public interface UpdateSchema extends
PendingUpdate<Schema> {
* with other additions, renames, or
updates.
*/
UpdateSchema unionByNameWith(Schema newSchema);
+
+ /**
+ * Set the identifier fields given a set of field names.
+ * See {@link Schema#identifierFieldIds()} to learn more about Iceberg
identifier.
+ *
+ * @param names names of the columns to set as identifier fields
+ * @return this for method chaining
+ */
+ UpdateSchema setIdentifierFields(Set<String> names);
+
+ /**
+ * Set the identifier fields given some field names.
+ * See {@link UpdateSchema#setIdentifierFields(Set)} for more details.
+ *
+ * @param names names of the columns to set as identifier fields
+ * @return this for method chaining
+ */
+ default UpdateSchema setIdentifierFields(String... names) {
+ return setIdentifierFields(Sets.newHashSet(names));
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/SchemaParser.java
b/core/src/main/java/org/apache/iceberg/SchemaParser.java
index 534e085..0a4f6f3 100644
--- a/core/src/main/java/org/apache/iceberg/SchemaParser.java
+++ b/core/src/main/java/org/apache/iceberg/SchemaParser.java
@@ -27,6 +27,7 @@ import java.io.IOException;
import java.io.StringWriter;
import java.util.Iterator;
import java.util.List;
+import java.util.Set;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -40,6 +41,7 @@ public class SchemaParser {
}
private static final String SCHEMA_ID = "schema-id";
+ private static final String IDENTIFIER_FIELD_IDS = "identifier-field-ids";
private static final String TYPE = "type";
private static final String STRUCT = "struct";
private static final String LIST = "list";
@@ -59,10 +61,11 @@ public class SchemaParser {
private static final String VALUE_REQUIRED = "value-required";
private static void toJson(Types.StructType struct, JsonGenerator generator)
throws IOException {
- toJson(struct, null, generator);
+ toJson(struct, null, null, generator);
}
- private static void toJson(Types.StructType struct, Integer schemaId,
JsonGenerator generator) throws IOException {
+ private static void toJson(Types.StructType struct, Integer schemaId,
Set<Integer> identifierFieldIds,
+ JsonGenerator generator) throws IOException {
generator.writeStartObject();
generator.writeStringField(TYPE, STRUCT);
@@ -70,6 +73,14 @@ public class SchemaParser {
generator.writeNumberField(SCHEMA_ID, schemaId);
}
+ if (identifierFieldIds != null && !identifierFieldIds.isEmpty()) {
+ generator.writeArrayFieldStart(IDENTIFIER_FIELD_IDS);
+ for (int id : identifierFieldIds) {
+ generator.writeNumber(id);
+ }
+ generator.writeEndArray();
+ }
+
generator.writeArrayFieldStart(FIELDS);
for (Types.NestedField field : struct.fields()) {
generator.writeStartObject();
@@ -144,7 +155,7 @@ public class SchemaParser {
}
public static void toJson(Schema schema, JsonGenerator generator) throws
IOException {
- toJson(schema.asStruct(), schema.schemaId(), generator);
+ toJson(schema.asStruct(), schema.schemaId(), schema.identifierFieldIds(),
generator);
}
public static String toJson(Schema schema) {
@@ -158,7 +169,7 @@ public class SchemaParser {
if (pretty) {
generator.useDefaultPrettyPrinter();
}
- toJson(schema.asStruct(), generator);
+ toJson(schema.asStruct(), schema.schemaId(),
schema.identifierFieldIds(), generator);
generator.flush();
return writer.toString();
@@ -247,11 +258,12 @@ public class SchemaParser {
Preconditions.checkArgument(type.isNestedType() &&
type.asNestedType().isStructType(),
"Cannot create schema, not a struct type: %s", type);
Integer schemaId = JsonUtil.getIntOrNull(SCHEMA_ID, json);
+ Set<Integer> identifierFieldIds =
JsonUtil.getIntegerSetOrNull(IDENTIFIER_FIELD_IDS, json);
if (schemaId == null) {
- return new Schema(type.asNestedType().asStructType().fields());
+ return new Schema(type.asNestedType().asStructType().fields(),
identifierFieldIds);
} else {
- return new Schema(schemaId, type.asNestedType().asStructType().fields());
+ return new Schema(schemaId, type.asNestedType().asStructType().fields(),
identifierFieldIds);
}
}
diff --git a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java
b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java
index e4f2593..1fb21a0 100644
--- a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java
+++ b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java
@@ -24,6 +24,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.iceberg.mapping.MappingUtil;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.mapping.NameMappingParser;
@@ -33,6 +35,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Multimap;
import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.schema.UnionByNameVisitor;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
@@ -59,25 +62,32 @@ class SchemaUpdate implements UpdateSchema {
private final Multimap<Integer, Move> moves =
Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList);
private int lastColumnId;
private boolean allowIncompatibleChanges = false;
-
+ private Set<String> identifierFieldNames;
SchemaUpdate(TableOperations ops) {
- this.ops = ops;
- this.base = ops.current();
- this.schema = base.schema();
- this.lastColumnId = base.lastColumnId();
- this.idToParent =
Maps.newHashMap(TypeUtil.indexParents(schema.asStruct()));
+ this(ops, ops.current());
}
/**
* For testing only.
*/
SchemaUpdate(Schema schema, int lastColumnId) {
- this.ops = null;
- this.base = null;
+ this(null, null, schema, lastColumnId);
+ }
+
+ private SchemaUpdate(TableOperations ops, TableMetadata base) {
+ this(ops, base, base.schema(), base.lastColumnId());
+ }
+
+ private SchemaUpdate(TableOperations ops, TableMetadata base, Schema schema,
int lastColumnId) {
+ this.ops = ops;
+ this.base = base;
this.schema = schema;
this.lastColumnId = lastColumnId;
this.idToParent =
Maps.newHashMap(TypeUtil.indexParents(schema.asStruct()));
+ this.identifierFieldNames = schema.identifierFieldIds().stream()
+ .map(id -> schema.findField(id).name())
+ .collect(Collectors.toSet());
}
@Override
@@ -170,7 +180,6 @@ class SchemaUpdate implements UpdateSchema {
"Cannot delete a column that has additions: %s", name);
Preconditions.checkArgument(!updates.containsKey(field.fieldId()),
"Cannot delete a column that has updates: %s", name);
-
deletes.add(field.fieldId());
return this;
@@ -193,6 +202,11 @@ class SchemaUpdate implements UpdateSchema {
updates.put(fieldId, Types.NestedField.of(fieldId, field.isOptional(),
newName, field.type(), field.doc()));
}
+ if (identifierFieldNames.contains(name)) {
+ identifierFieldNames.remove(name);
+ identifierFieldNames.add(newName);
+ }
+
return this;
}
@@ -317,6 +331,12 @@ class SchemaUpdate implements UpdateSchema {
return this;
}
+ @Override
+ public UpdateSchema setIdentifierFields(Set<String> names) {
+ this.identifierFieldNames = Sets.newHashSet(names);
+ return this;
+ }
+
private Integer findForMove(String name) {
Types.NestedField field = schema.findField(name);
if (field != null) {
@@ -359,7 +379,7 @@ class SchemaUpdate implements UpdateSchema {
*/
@Override
public Schema apply() {
- Schema newSchema = applyChanges(schema, deletes, updates, adds, moves);
+ Schema newSchema = applyChanges(schema, deletes, updates, adds, moves,
identifierFieldNames);
// Validate the metrics if we have existing properties.
if (base != null && base.properties() != null) {
@@ -408,11 +428,53 @@ class SchemaUpdate implements UpdateSchema {
private static Schema applyChanges(Schema schema, List<Integer> deletes,
Map<Integer, Types.NestedField> updates,
Multimap<Integer, Types.NestedField> adds,
- Multimap<Integer, Move> moves) {
+ Multimap<Integer, Move> moves,
+ Set<String> identifierFieldNames) {
+ // validate existing identifier fields are not deleted
+ for (String name : identifierFieldNames) {
+ Types.NestedField field = schema.findField(name);
+ if (field != null) {
+ Preconditions.checkArgument(!deletes.contains(field.fieldId()),
+ "Cannot delete identifier field %s. To force deletion, " +
+ "also call setIdentifierFields to update identifier fields.",
field);
+ }
+ }
+
+ // apply schema changes
Types.StructType struct = TypeUtil
.visit(schema, new ApplyChanges(deletes, updates, adds, moves))
.asNestedType().asStructType();
- return new Schema(struct.fields());
+
+ // validate identifier requirements based on the latest schema
+ Map<String, Integer> nameToId = TypeUtil.indexByName(struct);
+ Set<Integer> freshIdentifierFieldIds = Sets.newHashSet();
+ for (String name : identifierFieldNames) {
+ Preconditions.checkArgument(nameToId.containsKey(name),
+ "Cannot add field %s as an identifier field: not found in current
schema or added columns");
+ freshIdentifierFieldIds.add(nameToId.get(name));
+ }
+
+ Map<Integer, Integer> idToParent =
TypeUtil.indexParents(schema.asStruct());
+ Map<Integer, Types.NestedField> idToField = TypeUtil.indexById(struct);
+ freshIdentifierFieldIds.forEach(id -> validateIdentifierField(id,
idToField, idToParent));
+
+ return new Schema(struct.fields(), freshIdentifierFieldIds);
+ }
+
+ private static void validateIdentifierField(int fieldId, Map<Integer,
Types.NestedField> idToField,
+ Map<Integer, Integer>
idToParent) {
+ Types.NestedField field = idToField.get(fieldId);
+ Preconditions.checkArgument(field.type().isPrimitiveType(),
+ "Cannot add field %s as an identifier field: not a primitive type
field", field.name());
+
+ // check whether the nested field is in a chain of struct fields
+ Integer parentId = idToParent.get(field.fieldId());
+ while (parentId != null) {
+ Types.NestedField parent = idToField.get(parentId);
+ Preconditions.checkArgument(parent.type().isStructType(),
+ "Cannot add field %s as an identifier field: must not be nested in
%s", field.name(), parent);
+ parentId = idToParent.get(parent.fieldId());
+ }
}
private static class ApplyChanges extends TypeUtil.SchemaVisitor<Type> {
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java
b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index 1e33c54..532519e 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -491,7 +491,7 @@ public class TableMetadata implements Serializable {
ImmutableList.Builder<Schema> builder =
ImmutableList.<Schema>builder().addAll(schemas);
if (!schemasById.containsKey(newSchemaId)) {
- builder.add(new Schema(newSchemaId, newSchema.columns()));
+ builder.add(new Schema(newSchemaId, newSchema.columns(),
newSchema.identifierFieldIds()));
}
return new TableMetadata(null, formatVersion, uuid, location,
@@ -760,7 +760,7 @@ public class TableMetadata implements Serializable {
ImmutableList.Builder<Schema> schemasBuilder =
ImmutableList.<Schema>builder().addAll(schemas);
if (!schemasById.containsKey(freshSchemaId)) {
- schemasBuilder.add(new Schema(freshSchemaId, freshSchema.columns()));
+ schemasBuilder.add(new Schema(freshSchemaId, freshSchema.columns(),
freshSchema.identifierFieldIds()));
}
return new TableMetadata(null, formatVersion, uuid, newLocation,
@@ -916,7 +916,7 @@ public class TableMetadata implements Serializable {
// if the schema already exists, use its id; otherwise use the highest id
+ 1
int newSchemaId = currentSchemaId;
for (Schema schema : schemas) {
- if (schema.asStruct().equals(newSchema.asStruct())) {
+ if (schema.sameSchema(newSchema)) {
newSchemaId = schema.schemaId();
break;
} else if (schema.schemaId() >= newSchemaId) {
diff --git a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java
b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java
index 99b82a4..bcde959 100644
--- a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java
@@ -25,9 +25,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
public class JsonUtil {
@@ -117,18 +119,80 @@ public class JsonUtil {
public static List<String> getStringList(String property, JsonNode node) {
Preconditions.checkArgument(node.has(property), "Cannot parse missing list
%s", property);
- JsonNode pNode = node.get(property);
- Preconditions.checkArgument(pNode != null && !pNode.isNull() &&
pNode.isArray(),
- "Cannot parse %s from non-array value: %s", property, pNode);
+ return ImmutableList.<String>builder()
+ .addAll(new JsonStringArrayIterator(property, node))
+ .build();
+ }
+
+ public static Set<Integer> getIntegerSetOrNull(String property, JsonNode
node) {
+ if (!node.has(property)) {
+ return null;
+ }
+
+ return ImmutableSet.<Integer>builder()
+ .addAll(new JsonIntegerArrayIterator(property, node))
+ .build();
+ }
+
+ abstract static class JsonArrayIterator<T> implements Iterator<T> {
- ImmutableList.Builder<String> builder = ImmutableList.builder();
- Iterator<JsonNode> elements = pNode.elements();
- while (elements.hasNext()) {
+ private final Iterator<JsonNode> elements;
+
+ JsonArrayIterator(String property, JsonNode node) {
+ JsonNode pNode = node.get(property);
+ Preconditions.checkArgument(pNode != null && !pNode.isNull() &&
pNode.isArray(),
+ "Cannot parse %s from non-array value: %s", property, pNode);
+ this.elements = pNode.elements();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return elements.hasNext();
+ }
+
+ @Override
+ public T next() {
JsonNode element = elements.next();
- Preconditions.checkArgument(element.isTextual(),
- "Cannot parse string from non-text value: %s", element);
- builder.add(element.asText());
+ validate(element);
+ return convert(element);
+ }
+
+ abstract T convert(JsonNode element);
+
+ abstract void validate(JsonNode element);
+ }
+
+ static class JsonStringArrayIterator extends JsonArrayIterator<String> {
+
+ JsonStringArrayIterator(String property, JsonNode node) {
+ super(property, node);
+ }
+
+ @Override
+ String convert(JsonNode element) {
+ return element.asText();
+ }
+
+ @Override
+ void validate(JsonNode element) {
+ Preconditions.checkArgument(element.isTextual(), "Cannot parse string
from non-text value: %s", element);
+ }
+ }
+
+ static class JsonIntegerArrayIterator extends JsonArrayIterator<Integer> {
+
+ JsonIntegerArrayIterator(String property, JsonNode node) {
+ super(property, node);
+ }
+
+ @Override
+ Integer convert(JsonNode element) {
+ return element.asInt();
+ }
+
+ @Override
+ void validate(JsonNode element) {
+ Preconditions.checkArgument(element.isInt(), "Cannot parse integer from
non-int value: %s", element);
}
- return builder.build();
}
}
diff --git a/core/src/test/java/org/apache/iceberg/TestSchemaUpdate.java
b/core/src/test/java/org/apache/iceberg/TestSchemaUpdate.java
index ecd29b8..6b8380e 100644
--- a/core/src/test/java/org/apache/iceberg/TestSchemaUpdate.java
+++ b/core/src/test/java/org/apache/iceberg/TestSchemaUpdate.java
@@ -1223,4 +1223,239 @@ public class TestSchemaUpdate {
.moveBefore("s2.x", "s1.a")
.apply());
}
+
+ @Test
+ public void testAddExistingIdentifierFields() {
+ Schema newSchema = new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID)
+ .setIdentifierFields("id")
+ .apply();
+
+ Assert.assertEquals("add an existing field as identifier field should
succeed",
+ Sets.newHashSet(newSchema.findField("id").fieldId()),
+ newSchema.identifierFieldIds());
+ }
+
+ @Test
+ public void testAddNewIdentifierFieldColumns() {
+ Schema newSchema = new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID)
+ .addColumn("new_field", Types.StringType.get())
+ .setIdentifierFields("id", "new_field")
+ .apply();
+
+ Assert.assertEquals("add column then set as identifier should succeed",
+ Sets.newHashSet(newSchema.findField("id").fieldId(),
newSchema.findField("new_field").fieldId()),
+ newSchema.identifierFieldIds());
+
+ newSchema = new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID)
+ .setIdentifierFields("id", "new_field")
+ .addColumn("new_field", Types.StringType.get())
+ .apply();
+
+ Assert.assertEquals("set identifier then add column should succeed",
+ Sets.newHashSet(newSchema.findField("id").fieldId(),
newSchema.findField("new_field").fieldId()),
+ newSchema.identifierFieldIds());
+ }
+
+ @Test
+ public void testAddNestedIdentifierFieldColumns() {
+ Schema newSchema = new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID)
+ .setIdentifierFields("preferences.feature1")
+ .apply();
+
+ Assert.assertEquals("set existing nested field as identifier should
succeed",
+ Sets.newHashSet(newSchema.findField("preferences.feature1").fieldId()),
+ newSchema.identifierFieldIds());
+
+ newSchema = new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID)
+ .addColumn("new", Types.StructType.of(
+ Types.NestedField.optional(SCHEMA_LAST_COLUMN_ID + 1, "field",
Types.StringType.get())
+ ))
+ .setIdentifierFields("new.field")
+ .apply();
+
+ Assert.assertEquals("set newly added nested field as identifier should
succeed",
+ Sets.newHashSet(newSchema.findField("new.field").fieldId()),
+ newSchema.identifierFieldIds());
+
+ newSchema = new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID)
+ .addColumn("new", Types.StructType.of(
+ Types.NestedField.optional(SCHEMA_LAST_COLUMN_ID + 1, "field",
Types.StructType.of(
+ Types.NestedField.optional(SCHEMA_LAST_COLUMN_ID + 2,
"nested", Types.StringType.get())))))
+ .setIdentifierFields("new.field.nested")
+ .apply();
+
+ Assert.assertEquals("set newly added multi-layer nested field as
identifier should succeed",
+ Sets.newHashSet(newSchema.findField("new.field.nested").fieldId()),
+ newSchema.identifierFieldIds());
+ }
+
+ @Test
+ public void testAddDottedIdentifierFieldColumns() {
+ Schema newSchema = new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID)
+ .addColumn(null, "dot.field", Types.StringType.get())
+ .setIdentifierFields("id", "dot.field")
+ .apply();
+
+ Assert.assertEquals("add a field with dot as identifier should succeed",
+ Sets.newHashSet(newSchema.findField("id").fieldId(),
newSchema.findField("dot.field").fieldId()),
+ newSchema.identifierFieldIds());
+ }
+
+ @Test
+ public void testRemoveIdentifierFields() {
+ Schema newSchema = new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID)
+ .addColumn("new_field", Types.StringType.get())
+ .addColumn("new_field2", Types.StringType.get())
+ .setIdentifierFields("id", "new_field", "new_field2")
+ .apply();
+
+ newSchema = new SchemaUpdate(newSchema, SCHEMA_LAST_COLUMN_ID)
+ .setIdentifierFields("new_field", "new_field2")
+ .apply();
+
+ Assert.assertEquals("remove an identifier field should succeed",
+ Sets.newHashSet(newSchema.findField("new_field").fieldId(),
newSchema.findField("new_field2").fieldId()),
+ newSchema.identifierFieldIds());
+
+ newSchema = new SchemaUpdate(newSchema, SCHEMA_LAST_COLUMN_ID)
+ .setIdentifierFields(Sets.newHashSet())
+ .apply();
+
+ Assert.assertEquals("remove all identifier fields should succeed",
+ Sets.newHashSet(),
+ newSchema.identifierFieldIds());
+ }
+
+ @Test
+ public void testSetIdentifierFieldsFails() {
+ AssertHelpers.assertThrows("add a field with name not exist should fail",
+ IllegalArgumentException.class,
+ "not found in current schema or added columns",
+ () -> new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID)
+ .setIdentifierFields("unknown")
+ .apply());
+
+ AssertHelpers.assertThrows("add a field of non-primitive type should fail",
+ IllegalArgumentException.class,
+ "not a primitive type field",
+ () -> new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID)
+ .setIdentifierFields("locations")
+ .apply());
+
+ AssertHelpers.assertThrows("add a map key nested field should fail",
+ IllegalArgumentException.class,
+ "must not be nested in " + SCHEMA.findField("locations"),
+ () -> new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID)
+ .setIdentifierFields("locations.key.zip")
+ .apply());
+
+ AssertHelpers.assertThrows("add a map value nested field should fail",
+ IllegalArgumentException.class,
+ "must not be nested in " + SCHEMA.findField("locations"),
+ () -> new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID)
+ .setIdentifierFields("locations.value.lat")
+ .apply());
+
+ AssertHelpers.assertThrows("add a nested field in list should fail",
+ IllegalArgumentException.class,
+ "must not be nested in " + SCHEMA.findField("points"),
+ () -> new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID)
+ .setIdentifierFields("points.element.x")
+ .apply());
+
+ Schema newSchema = new SchemaUpdate(SCHEMA, SCHEMA_LAST_COLUMN_ID)
+ .addColumn("new", Types.StructType.of(
+ Types.NestedField.optional(SCHEMA_LAST_COLUMN_ID + 1, "fields",
Types.ListType.ofOptional(
+ SCHEMA_LAST_COLUMN_ID + 2, Types.StructType.of(
+ Types.NestedField.optional(SCHEMA_LAST_COLUMN_ID + 3,
"nested", Types.StringType.get())
+ ))
+ )
+ ))
+ .apply();
+
+ AssertHelpers.assertThrows("add a nested field in struct of a map should
fail",
+ IllegalArgumentException.class,
+ "must not be nested in " + newSchema.findField("new.fields"),
+ () -> new SchemaUpdate(newSchema, SCHEMA_LAST_COLUMN_ID + 3)
+ .setIdentifierFields("new.fields.element.nested")
+ .apply());
+ }
+
+ @Test
+ public void testDeleteIdentifierFieldColumns() {
+ Schema schemaWithIdentifierFields = new SchemaUpdate(SCHEMA,
SCHEMA_LAST_COLUMN_ID)
+ .setIdentifierFields("id")
+ .apply();
+
+ Assert.assertEquals("delete column and then reset identifier field should
succeed",
+ Sets.newHashSet(),
+ new SchemaUpdate(schemaWithIdentifierFields, SCHEMA_LAST_COLUMN_ID)
+ .deleteColumn("id").setIdentifierFields(Sets.newHashSet()).apply()
+ .identifierFieldIds());
+
+ Assert.assertEquals("delete reset identifier field and then delete column
should succeed",
+ Sets.newHashSet(),
+ new SchemaUpdate(schemaWithIdentifierFields, SCHEMA_LAST_COLUMN_ID)
+ .setIdentifierFields(Sets.newHashSet()).deleteColumn("id").apply()
+ .identifierFieldIds());
+ }
+
+ @Test
+ public void testDeleteIdentifierFieldColumnsFails() {
+ Schema schemaWithIdentifierFields = new SchemaUpdate(SCHEMA,
SCHEMA_LAST_COLUMN_ID)
+ .setIdentifierFields("id")
+ .apply();
+
+ AssertHelpers.assertThrows("delete an identifier column without setting
identifier fields should fail",
+ IllegalArgumentException.class,
+ "Cannot delete identifier field 1: id: required int. To force
deletion, " +
+ "also call setIdentifierFields to update identifier fields.",
+ () -> new SchemaUpdate(schemaWithIdentifierFields,
SCHEMA_LAST_COLUMN_ID).deleteColumn("id").apply());
+ }
+
+ @Test
+ public void testRenameIdentifierFields() {
+ Schema schemaWithIdentifierFields = new SchemaUpdate(SCHEMA,
SCHEMA_LAST_COLUMN_ID)
+ .setIdentifierFields("id")
+ .apply();
+
+ Schema newSchema = new SchemaUpdate(schemaWithIdentifierFields,
SCHEMA_LAST_COLUMN_ID)
+ .renameColumn("id", "id2")
+ .apply();
+
+ Assert.assertEquals("rename should not affect identifier fields",
+ Sets.newHashSet(SCHEMA.findField("id").fieldId()),
+ newSchema.identifierFieldIds());
+ }
+
+ @Test
+ public void testMoveIdentifierFields() {
+ Schema schemaWithIdentifierFields = new SchemaUpdate(SCHEMA,
SCHEMA_LAST_COLUMN_ID)
+ .setIdentifierFields("id")
+ .apply();
+
+ Schema newSchema = new SchemaUpdate(schemaWithIdentifierFields,
SCHEMA_LAST_COLUMN_ID)
+ .moveAfter("id", "locations")
+ .apply();
+
+ Assert.assertEquals("move after should not affect identifier fields",
+ Sets.newHashSet(SCHEMA.findField("id").fieldId()),
+ newSchema.identifierFieldIds());
+
+ newSchema = new SchemaUpdate(schemaWithIdentifierFields,
SCHEMA_LAST_COLUMN_ID)
+ .moveBefore("id", "locations")
+ .apply();
+
+ Assert.assertEquals("move before should not affect identifier fields",
+ Sets.newHashSet(SCHEMA.findField("id").fieldId()),
+ newSchema.identifierFieldIds());
+
+ newSchema = new SchemaUpdate(schemaWithIdentifierFields,
SCHEMA_LAST_COLUMN_ID)
+ .moveFirst("id")
+ .apply();
+
+ Assert.assertEquals("move first should not affect identifier fields",
+ Sets.newHashSet(SCHEMA.findField("id").fieldId()),
+ newSchema.identifierFieldIds());
+ }
}
diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
index a7b30a6..c44ca63 100644
--- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
+++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
@@ -647,6 +647,33 @@ public class TestTableMetadata {
}
@Test
+ public void testParseSchemaIdentifierFields() throws Exception {
+ String data = readTableMetadataInputFile("TableMetadataV2Valid.json");
+ TableMetadata parsed = TableMetadataParser.fromJson(
+ ops.io(), null, JsonUtil.mapper().readValue(data, JsonNode.class));
+ Assert.assertEquals(Sets.newHashSet(),
parsed.schemasById().get(0).identifierFieldIds());
+ Assert.assertEquals(Sets.newHashSet(1, 2),
parsed.schemasById().get(1).identifierFieldIds());
+ }
+
+ @Test
+ public void testUpdateSchemaIdentifierFields() {
+ Schema schema = new Schema(
+ Types.NestedField.required(10, "x", Types.StringType.get())
+ );
+
+ TableMetadata meta = TableMetadata.newTableMetadata(
+ schema, PartitionSpec.unpartitioned(), null, ImmutableMap.of());
+
+ Schema newSchema = new Schema(
+ Lists.newArrayList(Types.NestedField.required(1, "x",
Types.StringType.get())),
+ Sets.newHashSet(1)
+ );
+ TableMetadata newMeta = meta.updateSchema(newSchema, 1);
+ Assert.assertEquals(2, newMeta.schemas().size());
+ Assert.assertEquals(Sets.newHashSet(1),
newMeta.schema().identifierFieldIds());
+ }
+
+ @Test
public void testUpdateSchema() {
Schema schema = new Schema(0,
Types.NestedField.required(1, "y", Types.LongType.get(), "comment")
diff --git a/core/src/test/resources/TableMetadataV2Valid.json
b/core/src/test/resources/TableMetadataV2Valid.json
index cf492f5..d43e0a2 100644
--- a/core/src/test/resources/TableMetadataV2Valid.json
+++ b/core/src/test/resources/TableMetadataV2Valid.json
@@ -22,6 +22,10 @@
{
"type": "struct",
"schema-id": 1,
+ "identifier-field-ids": [
+ 1,
+ 2
+ ],
"fields": [
{
"id": 1,
@@ -85,4 +89,4 @@
"snapshots": [],
"snapshot-log": [],
"metadata-log": []
-}
\ No newline at end of file
+}