openinx commented on a change in pull request #2354:
URL: https://github.com/apache/iceberg/pull/2354#discussion_r601031583
##########
File path: core/src/main/java/org/apache/iceberg/TableMetadata.java
##########
@@ -877,6 +986,26 @@ private static SortOrder freshSortOrder(int orderId,
Schema schema, SortOrder so
return builder.build();
}
+ private static RowKey freshRowKey(int keyId, Schema schema, RowKey rowKey) {
+ RowKey.Builder builder = RowKey.builderFor(schema).withKeyId(keyId);
+
+ for (RowKeyField field : rowKey.fields()) {
+ // look up the name of the source field in the old schema to get the new
schema's id
+ String columnName = rowKey.schema().findColumnName(field.sourceId());
+ Preconditions.checkNotNull(columnName,
+ "Cannot find column in the row key's schema. id: %s, schema: %s",
+ field.sourceId(), rowKey.schema());
+
+ // reassign all row keys with fresh column IDs.
+ Types.NestedField column = schema.findField(columnName);
+ Preconditions.checkNotNull(column,
+ "Cannot find column in the fresh schema. name: %s, schema: %s",
columnName, schema);
+ builder.addField(column.fieldId());
Review comment:
Why not just use `builder.addField(columnName)` here ? The
builder#addField will validate the existence of the column inside, so we don't
have to check it again here.
##########
File path: api/src/main/java/org/apache/iceberg/RowKey.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+
+/**
+ * Row key of a table.
+ * <p>
+ * Row key is a definition of table row uniqueness,
+ * similar to the concept of primary key in a relational database system.
+ * A row should be unique in a table based on the values of each {@link
RowKeyField}.
+ * Iceberg itself does not enforce row uniqueness based on this key.
+ * It is leveraged by operations such as streaming upsert.
+ */
+public class RowKey implements Serializable {
+
+ private static final RowKey NOT_IDENTIFIED = new RowKey(null, 0,
ImmutableList.of());
+
+ private final Schema schema;
+ private final int keyId;
+ private final RowKeyField[] fields;
+
+ private transient volatile List<RowKeyField> fieldList;
+
+ private RowKey(Schema schema, int keyId, List<RowKeyField> fields) {
+ this.schema = schema;
+ this.keyId = keyId;
+ this.fields = fields.toArray(new RowKeyField[0]);
+ }
+
+ /**
+ * Returns the {@link Schema} referenced by the row key
+ */
+ public Schema schema() {
+ return schema;
+ }
+
+ /**
+ * Returns the ID of the row key
+ */
+ public int keyId() {
+ return keyId;
+ }
+
+ /**
+ * Return the list of {@link RowKeyField} in the row key
+ * <p>
+ * Notice that the order of each field matters.
+ * 2 keys with the same set of fields but different order are viewed as
different.
+ * The fields of the key should ideally be ordered based on the importance
of each field
+ * to be leveraged by features like secondary index.
Review comment:
This comment looks great, thanks for the doc.
##########
File path: api/src/main/java/org/apache/iceberg/RowKey.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+
+/**
+ * Row key of a table.
+ * <p>
+ * Row key is a definition of table row uniqueness,
+ * similar to the concept of primary key in a relational database system.
+ * A row should be unique in a table based on the values of each {@link
RowKeyField}.
+ * Iceberg itself does not enforce row uniqueness based on this key.
+ * It is leveraged by operations such as streaming upsert.
+ */
+public class RowKey implements Serializable {
+
+ private static final RowKey NOT_IDENTIFIED = new RowKey(null, 0,
ImmutableList.of());
+
+ private final Schema schema;
+ private final int keyId;
+ private final RowKeyField[] fields;
+
+ private transient volatile List<RowKeyField> fieldList;
+
+ private RowKey(Schema schema, int keyId, List<RowKeyField> fields) {
+ this.schema = schema;
+ this.keyId = keyId;
+ this.fields = fields.toArray(new RowKeyField[0]);
+ }
+
+ /**
+ * Returns the {@link Schema} referenced by the row key
+ */
+ public Schema schema() {
+ return schema;
+ }
+
+ /**
+ * Returns the ID of the row key
+ */
+ public int keyId() {
+ return keyId;
+ }
+
+ /**
+ * Return the list of {@link RowKeyField} in the row key
+ * <p>
+ * Notice that the order of each field matters.
+ * 2 keys with the same set of fields but different order are viewed as
different.
+ * The fields of the key should ideally be ordered based on the importance
of each field
+ * to be leveraged by features like secondary index.
+ *
+ * @return the list of fields in the row key
+ */
+ public List<RowKeyField> fields() {
+ return lazyFieldList();
+ }
+
+ private List<RowKeyField> lazyFieldList() {
+ if (fieldList == null) {
+ synchronized (this) {
+ if (fieldList == null) {
+ this.fieldList = ImmutableList.copyOf(fields);
+ }
+ }
+ }
+
+ return fieldList;
+ }
+
+ /**
+ * Checks whether this row key is equivalent to another ignoring the key ID.
+ *
+ * @param another a different row key
+ * @return true if this row key is equivalent to the given one
+ */
+ public boolean sameRowKey(RowKey another) {
+ return Arrays.equals(fields, another.fields);
+ }
+
+ /**
+ * Returns the initial default row key that has no field
+ */
+ public static RowKey notIdentified() {
+ return NOT_IDENTIFIED;
+ }
+
+ /**
+ * Returns true if the row key is the default one with no field
+ */
+ public boolean isNotIdentified() {
+ return fields.length < 1;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ } else if (other == null || getClass() != other.getClass()) {
+ return false;
+ }
+
+ RowKey that = (RowKey) other;
+ return this.keyId == that.keyId && sameRowKey(that);
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * Integer.hashCode(keyId) + Arrays.hashCode(fields);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("[");
+ for (RowKeyField field : fields) {
+ sb.append("\n");
+ sb.append(" ").append(field);
+ }
+ if (fields.length > 0) {
+ sb.append("\n");
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+
+ /**
+ * Creates a new {@link Builder row key builder} for the given {@link
Schema}.
+ *
+ * @param schema a schema
+ * @return a row key builder for the given schema.
+ */
+ public static Builder builderFor(Schema schema) {
+ return new Builder(schema);
+ }
+
+ /**
+ * A builder to create valid {@link RowKey row key}.
+ * <p>
+ * Call {@link #builderFor(Schema)} to create a new builder.
+ */
+ public static class Builder {
+ private final Schema schema;
+ private final List<RowKeyField> fields = Lists.newArrayList();
+ // Default key ID is 1 because 0 is reserved for default
+ private int keyId = 1;
+
+ private Builder(Schema schema) {
+ this.schema = schema;
+ }
+
+ public Builder withKeyId(int id) {
+ ValidationException.check(id >= 0, "Row key id must not be less than 0");
+ this.keyId = id;
+ return this;
+ }
+
+ public Builder addField(String name) {
+ Types.NestedField column = schema.findField(name);
+ ValidationException.check(column != null, "Cannot find column with name
%s in schema", name);
Review comment:
How about appending the schema string at the end of error message so
that we could easily find out what's wrong when encountered the validation
exception ?
##########
File path:
hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java
##########
@@ -235,6 +236,62 @@ public void testCreateTableCustomSortOrder() {
}
}
+ @Test
+ public void testCreateTableDefaultRowKey() {
+ Schema schema = new Schema(
+ required(1, "id", Types.IntegerType.get(), "unique ID"),
+ required(2, "data", Types.StringType.get())
+ );
+ PartitionSpec spec = PartitionSpec.builderFor(schema)
+ .bucket("data", 16)
+ .build();
Review comment:
Nit: seems we don't have to specify the partitionSpec to test the
default `RowKey` case.
##########
File path: api/src/main/java/org/apache/iceberg/Table.java
##########
@@ -88,6 +88,20 @@ default String name() {
*/
Map<Integer, SortOrder> sortOrders();
+ /**
+ * Return the {@link RowKey row key} for this table.
+ *
+ * @return this table's row key.
+ */
+ RowKey rowKey();
+
+ /**
+ * Return a map of row key versions to {@link RowKey row key} for this table.
+ *
+ * @return this table's row key map.
Review comment:
Nit: this table's row key map -> this table's row keys map
##########
File path:
hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java
##########
@@ -235,6 +236,62 @@ public void testCreateTableCustomSortOrder() {
}
}
+ @Test
+ public void testCreateTableDefaultRowKey() {
+ Schema schema = new Schema(
+ required(1, "id", Types.IntegerType.get(), "unique ID"),
+ required(2, "data", Types.StringType.get())
+ );
+ PartitionSpec spec = PartitionSpec.builderFor(schema)
+ .bucket("data", 16)
+ .build();
+ TableIdentifier tableIdent = TableIdentifier.of(DB_NAME, "tbl");
+
+ try {
+ Table table = catalog.createTable(tableIdent, schema, spec);
+ RowKey rowKey = table.rowKey();
+ Assert.assertEquals("Row key ID must match", 0, rowKey.keyId());
+ Assert.assertTrue("Row key must be default", rowKey.isNotIdentified());
+ } finally {
+ catalog.dropTable(tableIdent);
+ }
+ }
+
+ @Test
+ public void testCreateTableCustomRowKey() {
+ Schema schema = new Schema(
+ required(1, "id", Types.IntegerType.get(), "unique ID"),
+ required(2, "data", Types.StringType.get())
+ );
+ PartitionSpec spec = PartitionSpec.builderFor(schema)
+ .bucket("data", 16)
+ .build();
+ RowKey key = RowKey.builderFor(schema)
+ .withKeyId(1)
+ .addField("id")
+ .addField("data")
+ .build();
+ TableIdentifier tableIdent = TableIdentifier.of(DB_NAME, "tbl");
+
+ try {
+ Table table = catalog.buildTable(tableIdent, schema)
+ .withPartitionSpec(spec)
+ .withRowKey(key)
+ .create();
+
+ RowKey actualKey = table.rowKey();
+ Assert.assertEquals("Row key ID must match", 1, actualKey.keyId());
+ Assert.assertEquals("Row key must have 2 field", 2,
actualKey.fields().size());
+ Assert.assertEquals("Row key must have the expected field",
Review comment:
To test the logic that refreshing the field id of `RowKey` for keeping
consistency with persisted schema when creating table, I will suggest to use a
different field id in the original
[schema](https://github.com/apache/iceberg/pull/2354/files#diff-3fd28ad4ce0f08ba3a14e230c3b172af90b4bf8e28b8948d12c61e1807e22105R262-R265).
By default the newly created table's schema will be starting from 1, then we
don't know whether those field ids in RowKey has been refreshed or not.
##########
File path: core/src/main/java/org/apache/iceberg/TableMetadata.java
##########
@@ -253,13 +267,17 @@ public String toString() {
int lastAssignedPartitionId,
int defaultSortOrderId,
List<SortOrder> sortOrders,
+ int defaultRowKeyId,
+ List<RowKey> rowKeys,
Map<String, String> properties,
long currentSnapshotId,
List<Snapshot> snapshots,
List<HistoryEntry> snapshotLog,
List<MetadataLogEntry> previousFiles) {
Preconditions.checkArgument(specs != null && !specs.isEmpty(), "Partition
specs cannot be null or empty");
Preconditions.checkArgument(sortOrders != null && !sortOrders.isEmpty(),
"Sort orders cannot be null or empty");
+ Preconditions.checkArgument(rowKeys != null && !rowKeys.isEmpty(),
+ "Row keys cannot be null or empty");
Review comment:
Nit: this don't need to change to a new line.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]