jackye1995 commented on a change in pull request #2010:
URL: https://github.com/apache/iceberg/pull/2010#discussion_r556878173



##########
File path: api/src/main/java/org/apache/iceberg/PrimaryKey.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Objects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+/**
+ * A primary key that defines which columns will be unique in this table.
+ */
+public class PrimaryKey implements Serializable {
+
+  private static final PrimaryKey NON_PRIMARY_KEY = new PrimaryKey(null, 0, 
false, ImmutableList.of());
+
+  private final Schema schema;
+  private final int keyId;
+  private final boolean enforceUniqueness;
+  private final Integer[] sourceIds;
+
+  private transient volatile List<Integer> sourceIdList;
+
+  private PrimaryKey(Schema schema, int keyId, boolean enforceUniqueness, 
List<Integer> sourceIds) {
+    this.schema = schema;
+    this.keyId = keyId;
+    this.enforceUniqueness = enforceUniqueness;

Review comment:
       I don't think we should add have `enforceUniqueness` in Iceberg API, 
because even if a user specifies primary key must be enforced,  it is hard to 
enforce it at engine level. Primary key should always be enforced at "best 
effort". If Flink can enforce it through upsert, I feel it should be a flag in 
Flink to do so.

##########
File path: core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java
##########
@@ -125,17 +126,21 @@ private Table loadMetadataTable(String location, String 
metadataTableName, Metad
    * Create a table using the FileSystem implementation resolve from
    * location.
    *
-   * @param schema iceberg schema used to create the table
-   * @param spec partitioning spec, if null the table will be unpartitioned
+   * @param schema     iceberg schema used to create the table

Review comment:
       nit: unnecessary change in spacing

##########
File path: core/src/main/java/org/apache/iceberg/TableMetadataParser.java
##########
@@ -242,6 +253,7 @@ public static TableMetadata read(FileIO io, InputFile file) 
{
     }
   }
 
+  @SuppressWarnings("checkstyle:CyclomaticComplexity")

Review comment:
       should probably refactor the method into methods like 
`parsePartitionSpecs`, `parseSortOrders` and `parsePrimaryKeys`, but it is not 
urgent, we can do it in another PR.

##########
File path: api/src/main/java/org/apache/iceberg/PrimaryKey.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+/**
+ * A primary key that defines which columns will be unique in this table.
+ */
+public class PrimaryKey implements Serializable {
+
+  private static final PrimaryKey NON_PRIMARY_KEY = new PrimaryKey(null, 0, 
false, ImmutableList.of());
+
+  private final Schema schema;
+  private final int keyId;
+  private final boolean enforceUniqueness;
+  private final Integer[] sourceIds;
+
+  private transient volatile List<Integer> sourceIdList;
+
+  private PrimaryKey(Schema schema, int keyId, boolean enforceUniqueness, 
List<Integer> sourceIds) {
+    this.schema = schema;
+    this.keyId = keyId;
+    this.enforceUniqueness = enforceUniqueness;
+    this.sourceIds = sourceIds.toArray(new Integer[0]);
+  }
+
+  /**
+   * Returns the {@link Schema} for this primary key.
+   */
+  public Schema schema() {
+    return schema;
+  }
+
+  /**
+   * Returns this ID of this primary key.
+   */
+  public int keyId() {
+    return keyId;
+  }
+
+  /**
+   * Returns true if the uniqueness should be guaranteed when writing iceberg 
table.
+   */
+  public boolean enforceUniqueness() {
+    return enforceUniqueness;
+  }
+
+  /**
+   * Returns the list of source field ids for this primary key.
+   */
+  public List<Integer> sourceIds() {
+    if (sourceIdList == null) {
+      synchronized (this) {
+        if (sourceIdList == null) {
+          this.sourceIdList = ImmutableList.copyOf(sourceIds);
+        }
+      }
+    }
+    return sourceIdList;
+  }
+
+  /**
+   * Returns true if the primary key has no column.
+   */
+  public boolean isNonPrimaryKey() {
+    return sourceIds.length == 0;
+  }
+
+  /**
+   * Returns a dummy primary key that has no column.
+   */
+  public static PrimaryKey nonPrimaryKey() {
+    return NON_PRIMARY_KEY;
+  }
+
+  /**
+   * Checks whether this primary key is equivalent to another primary key 
while ignoring the primary key id.
+   *
+   * @param other a different primary key.
+   * @return true if this key is equivalent to the given key.
+   */
+  public boolean samePrimaryKey(PrimaryKey other) {
+    return Arrays.equals(sourceIds, other.sourceIds) && enforceUniqueness == 
other.enforceUniqueness;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (this == other) {
+      return true;
+    } else if (!(other instanceof PrimaryKey)) {
+      return false;
+    }
+
+    PrimaryKey that = (PrimaryKey) other;
+    if (this.keyId != that.keyId) {
+      return false;
+    }
+
+    if (this.enforceUniqueness != that.enforceUniqueness) {
+      return false;
+    }
+
+    return Arrays.equals(sourceIds, that.sourceIds);
+  }
+
+  @Override
+  public int hashCode() {
+    int hash = 31 * keyId;
+    hash = hash + (enforceUniqueness ? 1 : 0);
+    hash += Arrays.hashCode(sourceIds);
+    return hash;
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("keyId", keyId)
+        .add("enforceUniqueness", enforceUniqueness)
+        .add("sourceIds", sourceIds())
+        .toString();
+  }
+
+  /**
+   * Creates a new {@link Builder primary key builder} for the given {@link 
Schema}.
+   *
+   * @param schema a schema
+   * @return a primary key builder for the given schema.
+   */
+  public static Builder builderFor(Schema schema) {
+    return new Builder(schema);
+  }
+
+  /**
+   * A builder to create valid {@link PrimaryKey primary keys}. Call {@link 
#builderFor(Schema)} to create a new
+   * builder.
+   */
+  public static class Builder {
+    private final Schema schema;
+    private final List<Integer> sourceIds = Lists.newArrayList();
+    // Default ID to 1 as 0 is reserved for non primary key.
+    private int keyId = 1;
+    private boolean enforceUniqueness = false;
+
+    private Builder(Schema schema) {
+      this.schema = schema;
+    }
+
+    public Builder withKeyId(int newKeyId) {
+      this.keyId = newKeyId;
+      return this;
+    }
+
+    public Builder withEnforceUniqueness(boolean enable) {
+      this.enforceUniqueness = enable;
+      return this;
+    }
+
+    public Builder addField(String name) {
+      Types.NestedField column = schema.findField(name);
+
+      Preconditions.checkNotNull(column, "Cannot find source column: %s", 
name);
+      Preconditions.checkArgument(column.isRequired(), "Cannot add optional 
source field to primary key: %s", name);
+
+      Type sourceType = column.type();
+      ValidationException.check(sourceType.isPrimitiveType(), "Cannot add 
non-primitive field: %s", sourceType);
+
+      sourceIds.add(column.fieldId());
+      return this;
+    }
+
+    public Builder addField(int sourceId) {
+      Types.NestedField column = schema.findField(sourceId);
+      Preconditions.checkNotNull(column, "Cannot find source column: %s", 
sourceId);
+      Preconditions.checkArgument(column.isRequired(), "Cannot add optional 
source field to primary key: %s", sourceId);

Review comment:
       agree, primary key should be non-null.
   nit: duplicated logic with L183, can you refactor the checks?

##########
File path: api/src/main/java/org/apache/iceberg/PrimaryKey.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+/**
+ * A primary key that defines which columns will be unique in this table.
+ */
+public class PrimaryKey implements Serializable {
+
+  private static final PrimaryKey NON_PRIMARY_KEY = new PrimaryKey(null, 0, 
false, ImmutableList.of());

Review comment:
       To be consistent with `PartitionSpec` and `SortOrder`, better to use 
`new PrimaryKey(new Schema(), ...)`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to