This is an automated email from the ASF dual-hosted git repository.

dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 73171cb7164573349bd53a996a51bb7058b778e0
Author: Gabor Kaszab <gaborkas...@cloudera.com>
AuthorDate: Wed Feb 7 12:33:04 2024 +0100

    IMPALA-12729: Allow creating primary keys for Iceberg tables
    
    There are writer engines that use Iceberg's identifier-field-ids from
    the Iceberg schema to identify the columns to be written into the
    equality delete files (Flink, NiFi). So far Impala wasn't able to
    populate this identifier-field-ids. This patch introduces the support
    for not enforced primary keys for Iceberg tables, where the primary key
    is going to be used for setting identifier-field-ids during Iceberg
    schema creation.
    
    Example syntax:
    CREATE TABLE ice_tbl (
      i int NOT NULL,
      j int,
      s string NOT NULL
      primary key(i, s) not enforced)
    PARTITIONED BY SPEC (truncate(10, s))
    STORED AS ICEBERG;
    
    There are some constraints with primary keys (PK) following the
    behavior of Flink:
     - Only NOT NULL columns can be in the PK.
     - PK is not allowed in the column definition level like
       'i int NOT NULL PRIMARY KEY'.
     - If the table is partitioned then the partition columns have to be
       part of the PK.
     - Float and double columns are not allowed for the PK.
     - Not allowed to drop a column that is used as a PK.
    
    Testing:
     - New E2E tests added for different table creation scenarios.
     - Manual test to use Nifi for writing into a table with PK.
    
    Change-Id: I7bea787acdabd8cb04661f4ddb5c3309af0364a6
    Reviewed-on: http://gerrit.cloudera.org:8080/21149
    Reviewed-by: Daniel Becker <daniel.bec...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 fe/src/main/cup/sql-parser.cup                     |   6 +-
 .../java/org/apache/impala/analysis/ColumnDef.java |  11 ++
 .../impala/analysis/CreateTableAsSelectStmt.java   |  11 +-
 .../org/apache/impala/analysis/SlotDescriptor.java |   6 +-
 .../java/org/apache/impala/analysis/TableDef.java  |  66 +++++++----
 .../org/apache/impala/catalog/FeIcebergTable.java  |   6 +
 .../org/apache/impala/catalog/IcebergColumn.java   |   7 ++
 .../impala/catalog/iceberg/IcebergCtasTarget.java  |  11 +-
 .../catalog/iceberg/IcebergMetadataTable.java      |   4 +-
 .../apache/impala/service/CatalogOpExecutor.java   |  14 ++-
 .../impala/service/IcebergCatalogOpExecutor.java   |  23 +++-
 .../apache/impala/util/IcebergSchemaConverter.java |  30 ++++-
 fe/src/main/jflex/sql-scanner.flex                 |   1 +
 .../org/apache/impala/analysis/AnalyzeDDLTest.java |   6 +-
 .../queries/QueryTest/iceberg-create.test          |  85 +++++++++++++-
 .../queries/QueryTest/iceberg-metadata-tables.test | 126 ++++++++++-----------
 .../queries/QueryTest/iceberg-negative.test        |  87 ++++++++++++++
 tests/custom_cluster/test_lineage.py               |   9 +-
 18 files changed, 389 insertions(+), 120 deletions(-)

diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index fb1b9b25f..b05baac44 100755
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -308,7 +308,7 @@ terminal
   KW_COMPUTE, KW_CONSTRAINT, KW_CONVERT, KW_COPY, KW_CREATE, KW_CROSS, 
KW_CUBE, KW_CURRENT, KW_DATA,
   KW_DATABASE, KW_DATABASES, KW_DATE, KW_DATETIME, KW_DECIMAL, KW_DEFAULT, 
KW_DELETE,
   KW_DELIMITED, KW_DESC, KW_DESCRIBE, KW_DISABLE, KW_DISTINCT, KW_DIV, 
KW_DOUBLE,
-  KW_DROP, KW_ELSE, KW_ENABLE, KW_ENCODING, KW_END, KW_ESCAPED, KW_EXCEPT, 
KW_EXECUTE,
+  KW_DROP, KW_ELSE, KW_ENABLE, KW_ENCODING, KW_END, KW_ENFORCED, KW_ESCAPED, 
KW_EXCEPT, KW_EXECUTE,
   KW_EXISTS, KW_EXPLAIN, KW_EXTENDED, KW_EXTERNAL, KW_FALSE, KW_FIELDS, 
KW_FILEFORMAT,
   KW_FILES, KW_FINALIZE_FN, KW_FIRST, KW_FLOAT, KW_FOLLOWING, KW_FOR, 
KW_FOREIGN,
   KW_FORMAT, KW_FORMATTED, KW_FROM, KW_FULL, KW_FUNCTION, KW_FUNCTIONS, 
KW_GRANT,
@@ -1677,6 +1677,8 @@ primary_keys ::=
   {: RESULT = new Pair<List<String>, Boolean>(col_names, true); :}
   | KW_NON KW_UNIQUE KW_PRIMARY key_ident LPAREN ident_list:col_names RPAREN
   {: RESULT = new Pair<List<String>, Boolean>(col_names, false); :}
+  | KW_PRIMARY key_ident LPAREN ident_list:col_names RPAREN KW_NOT KW_ENFORCED
+  {: RESULT = new Pair<List<String>, Boolean>(col_names, false); :}
   ;
 
 rely_spec ::=
@@ -4307,6 +4309,8 @@ word ::=
   {: RESULT = r.toString(); :}
   | KW_END:r
   {: RESULT = r.toString(); :}
+  | KW_ENFORCED:r
+  {: RESULT = r.toString(); :}
   | KW_ESCAPED:r
   {: RESULT = r.toString(); :}
   | KW_EXCEPT:r
diff --git a/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java 
b/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java
index 5ba145f30..b4e3e9b1d 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java
@@ -181,6 +181,17 @@ public class ColumnDef {
   public boolean hasIcebergOptions() {
     return isNullabilitySet();
   }
+  // Returns true if the column has options that are not supported for Iceberg 
tables.
+  public boolean hasIncompatibleIcebergOptions() {
+    return isPrimaryKey() || hasEncoding() || hasCompression() || 
hasDefaultValue()
+        || hasBlockSize();
+  }
+  // Returns true if the column has options that are not supported for Kudu 
tables.
+  public boolean hasIncompatibleKuduOptions() {
+    // This always returns false as currently only 'isNullable' is a valid 
Iceberg
+    // option that is also valid for Kudu.
+    return false;
+  }
   public boolean hasEncoding() { return encodingVal_ != null; }
   public boolean hasCompression() { return compressionVal_ != null; }
   public boolean hasBlockSize() { return blockSize_ != null; }
diff --git 
a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
index c6b33e644..87a62f1cf 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.impala.authorization.Privilege;
 import org.apache.impala.catalog.FeDb;
@@ -39,6 +40,7 @@ import org.apache.impala.service.CatalogOpExecutor;
 import org.apache.impala.thrift.THdfsFileFormat;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
 
 /**
  * Represents a CREATE TABLE AS SELECT (CTAS) statement
@@ -186,6 +188,8 @@ public class CreateTableAsSelectStmt extends StatementBase {
 
     // Add the columns from the select statement to the create statement.
     int colCnt = tmpQueryStmt.getColLabels().size();
+    Set<String> hashedPrimaryKeyColNames =
+        Sets.newHashSet(createStmt_.getTblPrimaryKeyColumnNames());
     for (int i = 0; i < colCnt; ++i) {
       ColumnDef colDef = new ColumnDef(tmpQueryStmt.getColLabels().get(i), 
null,
           Collections.<ColumnDef.Option, Object>emptyMap());
@@ -195,6 +199,11 @@ public class CreateTableAsSelectStmt extends StatementBase 
{
             "for column '%s'. Use cast() to explicitly specify the column type 
for " +
             "column '%s'.", colDef.getColName(), colDef.getColName()));
       }
+      if (createStmt_.getFileFormat() == THdfsFileFormat.ICEBERG &&
+          hashedPrimaryKeyColNames.contains(colDef.getColName())) {
+        // Iceberg tables require NOT NULL column config for primary key 
columns.
+        colDef.setNullable(false);
+      }
       createStmt_.getColumnDefs().add(colDef);
     }
     createStmt_.analyze(analyzer);
@@ -239,7 +248,7 @@ public class CreateTableAsSelectStmt extends StatementBase {
           partSpec = createStmt_.getIcebergPartitionSpecs().get(0);
         }
         tmpTable = new IcebergCtasTarget(db, msTbl, 
createStmt_.getColumnDefs(),
-            partSpec);
+            createStmt_.getTblPrimaryKeyColumnNames(), partSpec);
       } else if 
(HdfsFileFormat.isHdfsInputFormatClass(msTbl.getSd().getInputFormat())) {
         tmpTable = db.createFsCtasTarget(msTbl);
       }
diff --git a/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java 
b/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
index 73ba4a6a9..939249c01 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
@@ -27,6 +27,7 @@ import java.util.Set;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.ColumnStats;
 import org.apache.impala.catalog.FeKuduTable;
+import org.apache.impala.catalog.IcebergColumn;
 import org.apache.impala.catalog.KuduColumn;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.thrift.TSlotDescriptor;
@@ -170,10 +171,13 @@ public class SlotDescriptor {
     type_ = path_.destType();
     label_ = Joiner.on(".").join(path.getRawPath());
 
-    // Set nullability, if this refers to a KuduColumn.
+    // Set nullability based on column type.
     if (path_.destColumn() instanceof KuduColumn) {
       KuduColumn kuduColumn = (KuduColumn)path_.destColumn();
       isNullable_ = kuduColumn.isNullable();
+    } else if (path_.destColumn() instanceof IcebergColumn) {
+      IcebergColumn icebergColumn = (IcebergColumn)path_.destColumn();
+      isNullable_ = icebergColumn.isNullable();
     }
   }
 
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableDef.java 
b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
index 428de0a0c..fce1039f6 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TableDef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
@@ -57,6 +57,7 @@ import org.apache.thrift.TException;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 /**
  * Represents the table parameters in a CREATE TABLE statement. These 
parameters
@@ -85,9 +86,12 @@ class TableDef {
   // mean no primary keys were specified as the columnDefs_ could contain 
primary keys.
   private final List<String> primaryKeyColNames_ = new ArrayList<>();
 
-  // If true, the primary key is unique. If not, an auto-incrementing column 
will be
-  // added automatically by Kudu engine. This extra key column helps produce a 
unique
-  // composite primary key (primary keys + auto-incrementing construct).
+  // If true, the primary key is unique. If not, and the table is a Kudu table 
then an
+  // auto-incrementing column will be added automatically by Kudu engine. This 
extra key
+  // column helps produce a unique composite primary key (primary keys +
+  // auto-incrementing construct).
+  // This is also used for Iceberg table and set to false if "NOT ENFORCED" is 
provided
+  // for the primary key.
   private boolean isPrimaryKeyUnique_;
 
   // If true, the table's data will be preserved if dropped.
@@ -478,28 +482,20 @@ class TableDef {
   }
 
   /**
-   * Kudu and Iceberg table has their own column option, this function will 
return false
-   * if we use column option incorrectly, such as use primary key for Parquet 
format.
+   * Kudu and Iceberg tables have their own column options, this function will 
return
+   * false if we use column options incorrectly, e.g. primary key column 
option for an
+   * Iceberg table.
    */
   private boolean analyzeColumnOption(ColumnDef columnDef) {
-    boolean flag = true;
-    // Kudu option and Iceberg option have overlap
-    if (columnDef.hasKuduOptions() && columnDef.hasIcebergOptions()) {
-      if (!isKuduTable() && !isIcebergTable()) {
-        flag = false;
-      }
-    } else if (columnDef.hasKuduOptions()) {
-      if (!isKuduTable()) {
-        flag = false;
-      }
-    } else if (columnDef.hasIcebergOptions()) {
-      // Currently Iceberg option only contains 'nullable', so we won't run 
into this
-      // situation.
-      if (!isIcebergTable()) {
-        flag = false;
-      }
-    }
-    return flag;
+    if (isKuduTable()) {
+      if (columnDef.hasIncompatibleKuduOptions()) return false;
+    } else if (isIcebergTable()) {
+      if (columnDef.hasIncompatibleIcebergOptions()) return false;
+    } else if (columnDef.hasKuduOptions() || columnDef.hasIcebergOptions()) {
+      // If the table is neither Kudu or Iceberg but has some incompatible 
column options.
+      return false;
+    }
+    return true;
   }
 
   /**
@@ -574,9 +570,29 @@ class TableDef {
           "(col1, col2, ...) syntax at the end of the column definition.",
           primaryKeyString, primaryKeyString, primaryKeyString));
     } else if (!primaryKeyColNames_.isEmpty() && !isPrimaryKeyUnique()
-        && !isKuduTable()) {
-      throw new AnalysisException(primaryKeyString + " is only supported for 
Kudu.");
+        && !isKuduTable() && !isIcebergTable()) {
+      throw new AnalysisException(primaryKeyString +
+          " is only supported for Kudu and Iceberg.");
+    }
+
+    if (isIcebergTable() && isPrimaryKeyUnique_) {
+      throw new AnalysisException(
+          "Iceberg tables only support NOT ENFORCED primary keys.");
+    }
+
+    Set<String> hashedPKColNames = Sets.newHashSet(primaryKeyColNames_);
+    List<IcebergPartitionSpec> icebergPartitionSpecs = 
getIcebergPartitionSpecs();
+    if (!icebergPartitionSpecs.isEmpty()) {
+      Preconditions.checkState(icebergPartitionSpecs.size() == 1);
+      IcebergPartitionSpec partSpec = icebergPartitionSpecs.get(0);
+      for (IcebergPartitionField partField : 
partSpec.getIcebergPartitionFields()) {
+        if (!hashedPKColNames.contains(partField.getFieldName())) {
+          throw new AnalysisException("Partition columns have to be part of 
the " +
+              "primary key for Iceberg tables.");
+        }
+      }
     }
+
     Map<String, ColumnDef> colDefsByColName = 
ColumnDef.mapByColumnNames(columnDefs_);
     int keySeq = 1;
     String constraintName = null;
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java 
b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
index 633fffc9d..61768cd5c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
@@ -92,6 +92,7 @@ import org.apache.impala.util.IcebergSchemaConverter;
 import org.apache.impala.util.IcebergUtil;
 import org.apache.impala.util.ListMap;
 import org.apache.impala.util.TResultRowBuilder;
+import org.apache.thrift.TException;
 
 import org.json.simple.JSONValue;
 import org.slf4j.Logger;
@@ -199,6 +200,11 @@ public interface FeIcebergTable extends FeFsTable {
     return getIcebergApiTable().schema();
   }
 
+  @Override
+  default List<String> getPrimaryKeyColumnNames() throws TException {
+    return Lists.newArrayList(getIcebergSchema().identifierFieldNames());
+  }
+
   @Override
   default boolean isCacheable() {
     return getFeFsTable().isCacheable();
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergColumn.java 
b/fe/src/main/java/org/apache/impala/catalog/IcebergColumn.java
index 1b6f953d3..a5483d1b8 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergColumn.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergColumn.java
@@ -48,6 +48,13 @@ public class IcebergColumn extends Column {
     isNullable_ = isNullable;
   }
 
+  public static IcebergColumn cloneWithNullability(IcebergColumn source,
+      boolean isNullable) {
+    return new IcebergColumn(source.name_, source.type_, source.comment_,
+        source.position_, source.fieldId_, source.fieldMapKeyId_, 
source.fieldMapKeyId_,
+        isNullable);
+  }
+
   public int getFieldId() {
     return fieldId_;
   }
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java 
b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java
index 5ceeefd07..5cb4d163f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java
@@ -89,11 +89,11 @@ public class IcebergCtasTarget extends CtasTargetTable 
implements FeIcebergTable
   private HdfsStorageDescriptor hdfsSd_;
 
   public IcebergCtasTarget(FeDb db, org.apache.hadoop.hive.metastore.api.Table 
msTbl,
-      List<ColumnDef> columnDefs, IcebergPartitionSpec partSpec)
-      throws CatalogException, ImpalaRuntimeException {
+        List<ColumnDef> columnDefs, List<String> primaryKeyNames,
+        IcebergPartitionSpec partSpec) throws CatalogException, 
ImpalaRuntimeException {
     super(msTbl, db, msTbl.getTableName(), msTbl.getOwner());
     createFsTable(db, msTbl);
-    createIcebergSchema(columnDefs);
+    createIcebergSchema(columnDefs, primaryKeyNames);
     createPartitionSpec(partSpec);
     icebergCatalog_ = IcebergUtil.getTIcebergCatalog(msTbl);
     setLocations();
@@ -105,13 +105,14 @@ public class IcebergCtasTarget extends CtasTargetTable 
implements FeIcebergTable
     hdfsSd_ = HdfsStorageDescriptor.fromStorageDescriptor(name_, 
msTable_.getSd());
   }
 
-  private void createIcebergSchema(List<ColumnDef> columnDefs) throws 
CatalogException {
+  private void createIcebergSchema(List<ColumnDef> columnDefs,
+      List<String> primaryKeyNames) throws CatalogException {
     List<TColumn> tcols = new ArrayList<>();
     for (ColumnDef col : columnDefs) {
       tcols.add(col.toThrift());
     }
     try {
-      iceSchema_ = IcebergSchemaConverter.genIcebergSchema(tcols);
+      iceSchema_ = IcebergSchemaConverter.genIcebergSchema(tcols, 
primaryKeyNames);
       // In genIcebergSchema() we did our best to assign correct field ids to 
columns,
       // but to be sure, let's use Iceberg's API function to assign field ids.
       iceSchema_ = TypeUtil.assignIncreasingFreshIds(iceSchema_);
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergMetadataTable.java 
b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergMetadataTable.java
index 877b31a46..3d417eef9 100644
--- 
a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergMetadataTable.java
+++ 
b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergMetadataTable.java
@@ -31,6 +31,7 @@ import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.FeCatalogUtils;
 import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FeTable;
+import org.apache.impala.catalog.IcebergColumn;
 import org.apache.impala.catalog.VirtualTable;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.thrift.TColumnDescriptor;
@@ -73,7 +74,8 @@ public class IcebergMetadataTable extends VirtualTable {
         metadataTableSchema)) {
       LOG.trace("Adding column: \"{}\" with type: \"{}\" to metadata table.",
           col.getName(), col.getType());
-      addColumn(col);
+      addColumn(IcebergColumn.cloneWithNullability(
+          (IcebergColumn)col, true /*isNullable*/));
     }
   }
 
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index e8be33988..8fcb54d1b 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -3560,7 +3560,8 @@ public class CatalogOpExecutor {
     } else if (IcebergTable.isIcebergTable(tbl)) {
       return createIcebergTable(tbl, wantMinimalResult, response, 
catalogTimeline,
           params.if_not_exists, params.getColumns(), 
params.getPartition_spec(),
-          params.getTable_properties(), params.getComment());
+          params.getPrimary_key_column_names(), params.getTable_properties(),
+          params.getComment());
     }
     Preconditions.checkState(params.getColumns().size() > 0,
         "Empty column list given as argument to Catalog.createTable");
@@ -3934,7 +3935,8 @@ public class CatalogOpExecutor {
   private boolean 
createIcebergTable(org.apache.hadoop.hive.metastore.api.Table newTable,
       boolean wantMinimalResult, TDdlExecResponse response, EventSequence 
catalogTimeline,
       boolean ifNotExists, List<TColumn> columns, TIcebergPartitionSpec 
partitionSpec,
-      Map<String, String> tableProperties, String tblComment) throws 
ImpalaException {
+      List<String> primaryKeyColumnNames, Map<String, String> tableProperties,
+      String tblComment) throws ImpalaException {
     Preconditions.checkState(IcebergTable.isIcebergTable(newTable));
 
     acquireMetastoreDdlLock(catalogTimeline);
@@ -3969,7 +3971,8 @@ public class CatalogOpExecutor {
             }
             String tableLoc = IcebergCatalogOpExecutor.createTable(catalog,
                 IcebergUtil.getIcebergTableIdentifier(newTable), location, 
columns,
-                partitionSpec, newTable.getOwner(), 
tableProperties).location();
+                partitionSpec, primaryKeyColumnNames, newTable.getOwner(),
+                tableProperties).location();
             newTable.getSd().setLocation(tableLoc);
             catalogTimeline.markEvent(CREATED_ICEBERG_TABLE + catalog.name());
           } else {
@@ -4196,8 +4199,9 @@ public class CatalogOpExecutor {
       TIcebergPartitionSpec partitionSpec = 
srcIceTable.getDefaultPartitionSpec()
           .toThrift();
       createIcebergTable(tbl, wantMinimalResult, response, catalogTimeline,
-          params.if_not_exists, columns, partitionSpec, tableProperties,
-          params.getComment());
+          params.if_not_exists, columns, partitionSpec,
+          
Lists.newArrayList(srcIceTable.getIcebergSchema().identifierFieldNames()),
+          tableProperties, params.getComment());
     } else if (srcTable instanceof KuduTable && KuduTable.isKuduTable(tbl)) {
       TCreateTableParams createTableParams =
           extractKuduCreateTableParams(params, tblName, (KuduTable) srcTable, 
tbl);
diff --git 
a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
index f126bf16e..404050471 100644
--- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
@@ -18,10 +18,10 @@
 package org.apache.impala.service;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.Collections;
 
 import org.apache.iceberg.AppendFiles;
@@ -71,6 +71,7 @@ import org.apache.impala.thrift.TIcebergPartitionSpec;
 import org.apache.impala.thrift.TRollbackType;
 import org.apache.impala.util.IcebergSchemaConverter;
 import org.apache.impala.util.IcebergUtil;
+import org.apache.thrift.TException;
 
 import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
@@ -90,9 +91,10 @@ public class IcebergCatalogOpExecutor {
    */
   public static Table createTable(TIcebergCatalog catalog, TableIdentifier 
identifier,
       String location, List<TColumn> columns, TIcebergPartitionSpec 
partitionSpec,
-      String owner, Map<String, String> tableProperties) throws 
ImpalaRuntimeException {
+      List<String> primaryKeyColumnNames, String owner,
+      Map<String, String> tableProperties) throws ImpalaRuntimeException {
     // Each table id increase from zero
-    Schema schema = createIcebergSchema(columns);
+    Schema schema = createIcebergSchema(columns, primaryKeyColumnNames);
     PartitionSpec spec = IcebergUtil.createIcebergPartition(schema, 
partitionSpec);
     IcebergCatalog icebergCatalog = IcebergUtil.getIcebergCatalog(catalog, 
location);
     if (icebergCatalog instanceof IcebergHiveCatalog) {
@@ -185,6 +187,15 @@ public class IcebergCatalogOpExecutor {
   public static void alterTableSetPartitionSpec(FeIcebergTable feTable,
       TIcebergPartitionSpec partSpec, Transaction transaction)
       throws ImpalaRuntimeException {
+    try {
+      if (!feTable.getPrimaryKeyColumnNames().isEmpty()) {
+        throw new ImpalaRuntimeException("Not allowed to do partition 
evolution on " +
+            "Iceberg tables with primary keys.");
+      }
+    } catch (TException tEx) {
+      throw new ImpalaRuntimeException(tEx.getMessage());
+    }
+
     BaseTable iceTable = (BaseTable)feTable.getIcebergApiTable();
     UpdatePartitionSpec updatePartitionSpec = transaction.updateSpec();
     iceTable.spec().fields().forEach(partitionField -> 
updatePartitionSpec.removeField(
@@ -297,9 +308,9 @@ public class IcebergCatalogOpExecutor {
   /**
    * Build iceberg schema by parameters.
    */
-  private static Schema createIcebergSchema(List<TColumn> columns)
-      throws ImpalaRuntimeException {
-    return IcebergSchemaConverter.genIcebergSchema(columns);
+  private static Schema createIcebergSchema(List<TColumn> columns,
+      List<String> primaryKeyColumnNames) throws ImpalaRuntimeException {
+    return IcebergSchemaConverter.genIcebergSchema(columns, 
primaryKeyColumnNames);
   }
 
   /**
diff --git 
a/fe/src/main/java/org/apache/impala/util/IcebergSchemaConverter.java 
b/fe/src/main/java/org/apache/impala/util/IcebergSchemaConverter.java
index 78d8b4458..ecee330ab 100644
--- a/fe/src/main/java/org/apache/impala/util/IcebergSchemaConverter.java
+++ b/fe/src/main/java/org/apache/impala/util/IcebergSchemaConverter.java
@@ -18,7 +18,11 @@
 package org.apache.impala.util;
 
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -168,16 +172,32 @@ public class IcebergSchemaConverter {
 
   /**
    * Generates Iceberg schema from given columns. It also assigns a unique 
'field id' for
-   * each schema element, although Iceberg will reassign the ids.
+   * each schema element, although Iceberg will reassign the ids. 
'primaryKeyColumnNames'
+   * are used for populating 'identifier-field-ids' in the Schema.
    */
-  public static Schema genIcebergSchema(List<TColumn> columns)
-      throws ImpalaRuntimeException {
+  public static Schema genIcebergSchema(List<TColumn> columns,
+      List<String> primaryKeyColumnNames) throws ImpalaRuntimeException {
     iThreadLocal.set(1);
     List<Types.NestedField> fields = new ArrayList<Types.NestedField>();
+    Map<String, Integer> colNameToFieldId = new HashMap<>();
     for (TColumn column : columns) {
-      fields.add(createIcebergField(column));
+      Types.NestedField icebergField = createIcebergField(column);
+      fields.add(icebergField);
+      colNameToFieldId.put(icebergField.name(), icebergField.fieldId());
+    }
+
+    if (primaryKeyColumnNames == null || primaryKeyColumnNames.isEmpty()) {
+      return new Schema(fields);
+    }
+
+    Set<Integer> identifierFieldIds = new HashSet<>();
+    for (String pkColName : primaryKeyColumnNames) {
+      if (!colNameToFieldId.containsKey(pkColName)) {
+        throw new ImpalaRuntimeException("Invalid primary key column name: " + 
pkColName);
+      }
+      identifierFieldIds.add(colNameToFieldId.get(pkColName));
     }
-    return new Schema(fields);
+    return new Schema(fields, identifierFieldIds);
   }
 
   /**
diff --git a/fe/src/main/jflex/sql-scanner.flex 
b/fe/src/main/jflex/sql-scanner.flex
index a679897b8..50ba0b52a 100644
--- a/fe/src/main/jflex/sql-scanner.flex
+++ b/fe/src/main/jflex/sql-scanner.flex
@@ -125,6 +125,7 @@ import org.apache.impala.thrift.TReservedWordsVersion;
     keywordMap.put("enable", SqlParserSymbols.KW_ENABLE);
     keywordMap.put("encoding", SqlParserSymbols.KW_ENCODING);
     keywordMap.put("end", SqlParserSymbols.KW_END);
+    keywordMap.put("enforced", SqlParserSymbols.KW_ENFORCED);
     keywordMap.put("escaped", SqlParserSymbols.KW_ESCAPED);
     keywordMap.put("except", SqlParserSymbols.KW_EXCEPT);
     keywordMap.put("exists", SqlParserSymbols.KW_EXISTS);
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java 
b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
index a1ea8cd9a..c36492456 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
@@ -2536,9 +2536,9 @@ public class AnalyzeDDLTest extends FrontendTestBase {
         " stored by kudu as select id, bool_col, tinyint_col, smallint_col, " +
         "int_col, bigint_col, float_col, double_col, date_string_col, 
string_col " +
         "from functional.alltypestiny");
-    AnalyzesOk("create table t primary key (id) stored by iceberg as select 
id, " +
-        "bool_col, int_col, float_col, double_col, date_string_col, string_col 
" +
-        "from functional.alltypestiny");
+    AnalyzesOk("create table t primary key (id) not enforced " +
+        "stored by iceberg as select id, bool_col, int_col, float_col, 
double_col, " +
+        "date_string_col, string_col from functional.alltypestiny");
 
     // IMPALA-9822 Row Format Delimited is valid only for Text Files
     String[] fileFormats = {"TEXTFILE", "PARQUET", "ICEBERG"};
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-create.test 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-create.test
index c8187fd81..23b834407 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-create.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-create.test
@@ -615,4 +615,87 @@ describe formatted ice_hadoop_tbl;
 '','external.table.purge','TRUE                '
 ---- TYPES
 string, string, string
-====
\ No newline at end of file
+====
+---- QUERY
+# Create an Iceberg table with PKs and check if 'identifier-field-ids' are 
populated.
+create table ice_tbl_with_pk
+    (i int not null, s string not null, d date not null, primary key(i, d) not 
enforced)
+    stored as iceberg;
+describe formatted ice_tbl_with_pk;
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:'','current-schema.*','.*identifier-field-ids\\\\":\[1,3\].*'
+---- TYPES
+string, string, string
+====
+---- QUERY
+# Similar as above, but the table is also partitioned.
+create table ice_tbl_with_pk_partitioned
+    (i int not null, s string not null, d date, primary key(i, s) not enforced)
+    partitioned by spec (s)
+    stored as iceberg;
+describe formatted ice_tbl_with_pk_partitioned;
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:'','current-schema.*','.*identifier-field-ids\\\\":\[1,2\].*'
+---- TYPES
+string, string, string
+====
+---- QUERY
+# Primary key on partition columns with partition transform.
+create table ice_tbl_with_pk_partition_transform
+    (i int not null, s string not null, d date not null, primary key(s, d) not 
enforced)
+    partitioned by spec (day(d), truncate(3,s), bucket(10, s))
+    stored as iceberg;
+describe formatted ice_tbl_with_pk_partition_transform;
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:'','current-schema.*','.*identifier-field-ids\\\\":\[2,3\].*'
+---- TYPES
+string, string, string
+====
+---- QUERY
+# When source table has primary keys the CTAS by default won't use it for the 
target
+# table.
+create table ctas_pk_in_source_not_used
+  stored as iceberg
+  as select * from ice_tbl_with_pk;
+describe formatted ctas_pk_in_source_not_used;
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:'.*','current-schema.*','.*'
+---- RESULTS: VERIFY_IS_NOT_IN
+row_regex:'.*','current-schema.*','.*identifier-field-ids.*'
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+# Primary key provided to a CTAS statement.
+create table ctas_pk_unpartitioned
+  primary key(d) not enforced
+  stored as iceberg
+  as select * from ice_tbl_with_pk;
+describe formatted ctas_pk_unpartitioned;
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:'.*','current-schema.*','.*identifier-field-ids\\\\":\[3\].*'
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+# Similar as above but the table is partitioned.
+create table ctas_pk_partitioned
+  primary key(s, d) not enforced
+  partitioned by spec (d)
+  stored as iceberg
+  as select * from ice_tbl_with_pk;
+describe formatted ctas_pk_partitioned;
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:'.*','current-schema.*','.*identifier-field-ids\\\\":\[2,3\].*'
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+# Create table like inherits the 'identifier-field-ids' from the source table.
+create table ice_like_pk like ice_tbl_with_pk_partitioned;
+describe formatted ice_like_pk;
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:'.*','current-schema.*','.*identifier-field-ids\\\\":\[1,2\].*'
+---- TYPES
+STRING,STRING,STRING
+====
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test
 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test
index 9918adcd7..b1cb6bad0 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test
@@ -718,8 +718,8 @@ STRING
 ---- QUERY
 describe functional_parquet.iceberg_query_metadata.snapshots;
 ---- RESULTS
-'committed_at','timestamp','','false'
-'snapshot_id','bigint','','false'
+'committed_at','timestamp','','true'
+'snapshot_id','bigint','','true'
 'parent_id','bigint','','true'
 'operation','string','','true'
 'manifest_list','string','','true'
@@ -731,11 +731,11 @@ STRING,STRING,STRING,STRING
 describe functional_parquet.iceberg_query_metadata.`files`;
 ---- RESULTS
 'content','int','Contents of the file: 0=data, 1=position deletes, 2=equality 
deletes','true'
-'file_path','string','Location URI with FS scheme','false'
-'file_format','string','File format name: avro, orc, or parquet','false'
+'file_path','string','Location URI with FS scheme','true'
+'file_format','string','File format name: avro, orc, or parquet','true'
 'spec_id','int','Partition spec ID','true'
-'record_count','bigint','Number of records in the file','false'
-'file_size_in_bytes','bigint','Total file size in bytes','false'
+'record_count','bigint','Number of records in the file','true'
+'file_size_in_bytes','bigint','Total file size in bytes','true'
 'column_sizes','map<int,bigint>','Map of column id to total size on 
disk','true'
 'value_counts','map<int,bigint>','Map of column id to total count, including 
null and NaN','true'
 'null_value_counts','map<int,bigint>','Map of column id to null value 
count','true'
@@ -754,11 +754,11 @@ STRING,STRING,STRING,STRING
 describe functional_parquet.iceberg_query_metadata.data_files;
 ---- RESULTS
 'content','int','Contents of the file: 0=data, 1=position deletes, 2=equality 
deletes','true'
-'file_path','string','Location URI with FS scheme','false'
-'file_format','string','File format name: avro, orc, or parquet','false'
+'file_path','string','Location URI with FS scheme','true'
+'file_format','string','File format name: avro, orc, or parquet','true'
 'spec_id','int','Partition spec ID','true'
-'record_count','bigint','Number of records in the file','false'
-'file_size_in_bytes','bigint','Total file size in bytes','false'
+'record_count','bigint','Number of records in the file','true'
+'file_size_in_bytes','bigint','Total file size in bytes','true'
 'column_sizes','map<int,bigint>','Map of column id to total size on 
disk','true'
 'value_counts','map<int,bigint>','Map of column id to total count, including 
null and NaN','true'
 'null_value_counts','map<int,bigint>','Map of column id to null value 
count','true'
@@ -777,11 +777,11 @@ STRING,STRING,STRING,STRING
 describe functional_parquet.iceberg_query_metadata.delete_files;
 ---- RESULTS
 'content','int','Contents of the file: 0=data, 1=position deletes, 2=equality 
deletes','true'
-'file_path','string','Location URI with FS scheme','false'
-'file_format','string','File format name: avro, orc, or parquet','false'
+'file_path','string','Location URI with FS scheme','true'
+'file_format','string','File format name: avro, orc, or parquet','true'
 'spec_id','int','Partition spec ID','true'
-'record_count','bigint','Number of records in the file','false'
-'file_size_in_bytes','bigint','Total file size in bytes','false'
+'record_count','bigint','Number of records in the file','true'
+'file_size_in_bytes','bigint','Total file size in bytes','true'
 'column_sizes','map<int,bigint>','Map of column id to total size on 
disk','true'
 'value_counts','map<int,bigint>','Map of column id to total count, including 
null and NaN','true'
 'null_value_counts','map<int,bigint>','Map of column id to null value 
count','true'
@@ -799,18 +799,18 @@ STRING,STRING,STRING,STRING
 ---- QUERY
 describe functional_parquet.iceberg_query_metadata.history;
 ---- RESULTS
-'made_current_at','timestamp','','false'
-'snapshot_id','bigint','','false'
+'made_current_at','timestamp','','true'
+'snapshot_id','bigint','','true'
 'parent_id','bigint','','true'
-'is_current_ancestor','boolean','','false'
+'is_current_ancestor','boolean','','true'
 ---- TYPES
 STRING,STRING,STRING,STRING
 ====
 ---- QUERY
 describe functional_parquet.iceberg_query_metadata.metadata_log_entries;
 ---- RESULTS
-'timestamp','timestamp','','false'
-'file','string','','false'
+'timestamp','timestamp','','true'
+'file','string','','true'
 'latest_snapshot_id','bigint','','true'
 'latest_schema_id','int','','true'
 'latest_sequence_number','bigint','','true'
@@ -820,8 +820,8 @@ STRING,STRING,STRING,STRING
 ---- QUERY
 describe functional_parquet.iceberg_query_metadata.snapshots;
 ---- RESULTS
-'committed_at','timestamp','','false'
-'snapshot_id','bigint','','false'
+'committed_at','timestamp','','true'
+'snapshot_id','bigint','','true'
 'parent_id','bigint','','true'
 'operation','string','','true'
 'manifest_list','string','','true'
@@ -832,9 +832,9 @@ STRING,STRING,STRING,STRING
 ---- QUERY
 describe functional_parquet.iceberg_query_metadata.refs;
 ---- RESULTS
-'name','string','','false'
-'type','string','','false'
-'snapshot_id','bigint','','false'
+'name','string','','true'
+'type','string','','true'
+'snapshot_id','bigint','','true'
 'max_reference_age_in_ms','bigint','','true'
 'min_snapshots_to_keep','int','','true'
 'max_snapshot_age_in_ms','bigint','','true'
@@ -844,30 +844,30 @@ STRING,STRING,STRING,STRING
 ---- QUERY
 describe functional_parquet.iceberg_query_metadata.manifests;
 ---- RESULTS
-'content','int','','false'
-'path','string','','false'
-'length','bigint','','false'
-'partition_spec_id','int','','false'
-'added_snapshot_id','bigint','','false'
-'added_data_files_count','int','','false'
-'existing_data_files_count','int','','false'
-'deleted_data_files_count','int','','false'
-'added_delete_files_count','int','','false'
-'existing_delete_files_count','int','','false'
-'deleted_delete_files_count','int','','false'
-'partition_summaries','array<struct<\n  contains_null:boolean,\n  
contains_nan:boolean,\n  lower_bound:string,\n  
upper_bound:string\n>>','','false'
+'content','int','','true'
+'path','string','','true'
+'length','bigint','','true'
+'partition_spec_id','int','','true'
+'added_snapshot_id','bigint','','true'
+'added_data_files_count','int','','true'
+'existing_data_files_count','int','','true'
+'deleted_data_files_count','int','','true'
+'added_delete_files_count','int','','true'
+'existing_delete_files_count','int','','true'
+'deleted_delete_files_count','int','','true'
+'partition_summaries','array<struct<\n  contains_null:boolean,\n  
contains_nan:boolean,\n  lower_bound:string,\n  
upper_bound:string\n>>','','true'
 ---- TYPES
 STRING,STRING,STRING,STRING
 ====
 ---- QUERY
 describe functional_parquet.iceberg_query_metadata.`partitions`;
 ---- RESULTS
-'record_count','bigint','Count of records in data files','false'
-'file_count','int','Count of data files','false'
-'position_delete_record_count','bigint','Count of records in position delete 
files','false'
-'position_delete_file_count','int','Count of position delete files','false'
-'equality_delete_record_count','bigint','Count of records in equality delete 
files','false'
-'equality_delete_file_count','int','Count of equality delete files','false'
+'record_count','bigint','Count of records in data files','true'
+'file_count','int','Count of data files','true'
+'position_delete_record_count','bigint','Count of records in position delete 
files','true'
+'position_delete_file_count','int','Count of position delete files','true'
+'equality_delete_record_count','bigint','Count of records in equality delete 
files','true'
+'equality_delete_file_count','int','Count of equality delete files','true'
 ---- TYPES
 STRING,STRING,STRING,STRING
 ====
@@ -875,11 +875,11 @@ STRING,STRING,STRING,STRING
 describe functional_parquet.iceberg_query_metadata.all_data_files;
 ---- RESULTS
 'content','int','Contents of the file: 0=data, 1=position deletes, 2=equality 
deletes','true'
-'file_path','string','Location URI with FS scheme','false'
-'file_format','string','File format name: avro, orc, or parquet','false'
+'file_path','string','Location URI with FS scheme','true'
+'file_format','string','File format name: avro, orc, or parquet','true'
 'spec_id','int','Partition spec ID','true'
-'record_count','bigint','Number of records in the file','false'
-'file_size_in_bytes','bigint','Total file size in bytes','false'
+'record_count','bigint','Number of records in the file','true'
+'file_size_in_bytes','bigint','Total file size in bytes','true'
 'column_sizes','map<int,bigint>','Map of column id to total size on 
disk','true'
 'value_counts','map<int,bigint>','Map of column id to total count, including 
null and NaN','true'
 'null_value_counts','map<int,bigint>','Map of column id to null value 
count','true'
@@ -898,11 +898,11 @@ STRING,STRING,STRING,STRING
 describe functional_parquet.iceberg_query_metadata.all_delete_files;
 ---- RESULTS
 'content','int','Contents of the file: 0=data, 1=position deletes, 2=equality 
deletes','true'
-'file_path','string','Location URI with FS scheme','false'
-'file_format','string','File format name: avro, orc, or parquet','false'
+'file_path','string','Location URI with FS scheme','true'
+'file_format','string','File format name: avro, orc, or parquet','true'
 'spec_id','int','Partition spec ID','true'
-'record_count','bigint','Number of records in the file','false'
-'file_size_in_bytes','bigint','Total file size in bytes','false'
+'record_count','bigint','Number of records in the file','true'
+'file_size_in_bytes','bigint','Total file size in bytes','true'
 'column_sizes','map<int,bigint>','Map of column id to total size on 
disk','true'
 'value_counts','map<int,bigint>','Map of column id to total count, including 
null and NaN','true'
 'null_value_counts','map<int,bigint>','Map of column id to null value 
count','true'
@@ -921,11 +921,11 @@ STRING,STRING,STRING,STRING
 describe functional_parquet.iceberg_query_metadata.all_files;
 ---- RESULTS
 'content','int','Contents of the file: 0=data, 1=position deletes, 2=equality 
deletes','true'
-'file_path','string','Location URI with FS scheme','false'
-'file_format','string','File format name: avro, orc, or parquet','false'
+'file_path','string','Location URI with FS scheme','true'
+'file_format','string','File format name: avro, orc, or parquet','true'
 'spec_id','int','Partition spec ID','true'
-'record_count','bigint','Number of records in the file','false'
-'file_size_in_bytes','bigint','Total file size in bytes','false'
+'record_count','bigint','Number of records in the file','true'
+'file_size_in_bytes','bigint','Total file size in bytes','true'
 'column_sizes','map<int,bigint>','Map of column id to total size on 
disk','true'
 'value_counts','map<int,bigint>','Map of column id to total count, including 
null and NaN','true'
 'null_value_counts','map<int,bigint>','Map of column id to null value 
count','true'
@@ -943,30 +943,30 @@ STRING,STRING,STRING,STRING
 ---- QUERY
 describe functional_parquet.iceberg_query_metadata.all_manifests;
 ---- RESULTS
-'content','int','','false'
-'path','string','','false'
-'length','bigint','','false'
+'content','int','','true'
+'path','string','','true'
+'length','bigint','','true'
 'partition_spec_id','int','','true'
 'added_snapshot_id','bigint','','true'
 'added_data_files_count','int','','true'
 'existing_data_files_count','int','','true'
 'deleted_data_files_count','int','','true'
-'added_delete_files_count','int','','false'
-'existing_delete_files_count','int','','false'
-'deleted_delete_files_count','int','','false'
+'added_delete_files_count','int','','true'
+'existing_delete_files_count','int','','true'
+'deleted_delete_files_count','int','','true'
 'partition_summaries','array<struct<\n  contains_null:boolean,\n  
contains_nan:boolean,\n  lower_bound:string,\n  
upper_bound:string\n>>','','true'
-'reference_snapshot_id','bigint','','false'
+'reference_snapshot_id','bigint','','true'
 ---- TYPES
 STRING,STRING,STRING,STRING
 ====
 ---- QUERY
 describe functional_parquet.iceberg_query_metadata.all_entries;
 ---- RESULTS
-'status','int','','false'
+'status','int','','true'
 'snapshot_id','bigint','','true'
 'sequence_number','bigint','','true'
 'file_sequence_number','bigint','','true'
-'data_file','struct<\n  content:int comment ''contents of the file: 0=data, 
1=position deletes, 2=equality deletes'',\n  file_path:string comment 
''location uri with fs scheme'',\n  file_format:string comment ''file format 
name: avro, orc, or parquet'',\n  spec_id:int comment ''partition spec id'',\n  
record_count:bigint comment ''number of records in the file'',\n  
file_size_in_bytes:bigint comment ''total file size in bytes'',\n  
column_sizes:map<int,bigint> comment ''map of column id  [...]
+'data_file','struct<\n  content:int comment ''contents of the file: 0=data, 
1=position deletes, 2=equality deletes'',\n  file_path:string comment 
''location uri with fs scheme'',\n  file_format:string comment ''file format 
name: avro, orc, or parquet'',\n  spec_id:int comment ''partition spec id'',\n  
record_count:bigint comment ''number of records in the file'',\n  
file_size_in_bytes:bigint comment ''total file size in bytes'',\n  
column_sizes:map<int,bigint> comment ''map of column id  [...]
 'readable_metrics','struct<\n  i:struct<\n    column_size:bigint comment 
''total size on disk'',\n    value_count:bigint comment ''total count, 
including null and nan'',\n    null_value_count:bigint comment ''null value 
count'',\n    nan_value_count:bigint comment ''nan value count'',\n    
lower_bound:int comment ''lower bound'',\n    upper_bound:int comment ''upper 
bound''\n  > comment ''metrics for column i''\n>','Column metrics in readable 
form','true'
 ---- TYPES
 STRING,STRING,STRING,STRING
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
index 7fca94d3d..96d0b78ea 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
@@ -854,3 +854,90 @@ alter table 
functional_parquet.iceberg_v2_delete_equality_partitioned drop colum
 ---- CATCH
 Failed to ALTER table 'iceberg_v2_delete_equality_partitioned': Cannot delete 
identifier field 2: s: required string.
 ====
+---- QUERY
+create table ice_pk_column_def (i int not null primary key) stored as iceberg;
+---- CATCH
+AnalysisException: Unsupported column options for file format 'ICEBERG': 'i 
INT PRIMARY KEY NOT NULL'
+====
+---- QUERY
+create table ice_pk_column_def (i int primary key) stored as iceberg;
+---- CATCH
+AnalysisException: Unsupported column options for file format 'ICEBERG': 'i 
INT PRIMARY KEY'
+====
+---- QUERY
+create table ice_tbl_with_pk (i int not null, primary key(i)) stored as 
iceberg;
+---- CATCH
+AnalysisException: Iceberg tables only support NOT ENFORCED primary keys.
+====
+---- QUERY
+create table ice_tbl_with_pk (i int, j int, primary key(i) not enforced) 
stored as iceberg;
+---- CATCH
+Cannot add field i as an identifier field: not a required field
+====
+---- QUERY
+create table ice_tbl_with_pk (i int not null, j int, primary key(i, j) not 
enforced)
+    partitioned by spec (j)
+    stored as iceberg;
+---- CATCH
+Cannot add field j as an identifier field: not a required field
+====
+---- QUERY
+create table ice_tbl_float_pk (f float not null, s string not null, d date not 
null, primary key(f) not enforced) stored as iceberg;
+---- CATCH
+IllegalArgumentException: Cannot add field f as an identifier field: must not 
be float or double field
+====
+---- QUERY
+create table ice_tbl_double_pk (db double not null, s string not null, d date 
not null, primary key(db) not enforced) stored as iceberg;
+---- CATCH
+IllegalArgumentException: Cannot add field db as an identifier field: must not 
be float or double field
+====
+---- QUERY
+create table ice_tbl_with_pk (i int, j int, primary key(i) not enforced)
+  partitioned by spec (j)
+  stored as iceberg;
+---- CATCH
+AnalysisException: Partition columns have to be part of the primary key for 
Iceberg tables.
+====
+---- QUERY
+create table ice_tbl_with_pk (i int not null, j int not null, k int,  primary 
key(i, j) not enforced)
+  partitioned by spec (j, k)
+  stored as iceberg;
+---- CATCH
+AnalysisException: Partition columns have to be part of the primary key for 
Iceberg tables.
+====
+---- QUERY
+create table ice_tbl_with_pk_partition_transform
+    (i int not null, s string not null, d date not null, primary key(s) not 
enforced)
+    partitioned by spec (day(d), truncate(3,s), bucket(10, s))
+    stored as iceberg;
+---- CATCH
+AnalysisException: Partition columns have to be part of the primary key for 
Iceberg tables.
+====
+---- QUERY
+create table ice_tbl_with_pk (i int not null, j int not null, primary key(i) 
not enforced) stored as iceberg;
+alter table ice_tbl_with_pk drop column i;
+---- CATCH
+Cannot delete identifier field 1: i: required int
+====
+---- QUERY
+create table ctas_pk
+  primary key(int_col)
+  stored as iceberg
+  as select * from functional_parquet.alltypes;
+---- CATCH
+AnalysisException: Iceberg tables only support NOT ENFORCED primary keys.
+====
+---- QUERY
+create table ctas_pk
+  primary key(int_col) not enforced
+  partitioned by spec (string_col)
+  stored as iceberg
+  as select * from functional_parquet.alltypes;
+---- CATCH
+AnalysisException: Partition columns have to be part of the primary key for 
Iceberg tables.
+====
+---- QUERY
+alter table ice_tbl_with_pk set partition spec (j)
+---- CATCH
+Not allowed to do partition evolution on Iceberg tables with primary keys.
+====
diff --git a/tests/custom_cluster/test_lineage.py 
b/tests/custom_cluster/test_lineage.py
index a670b428f..42e4cbee8 100644
--- a/tests/custom_cluster/test_lineage.py
+++ b/tests/custom_cluster/test_lineage.py
@@ -98,9 +98,12 @@ class TestLineage(CustomClusterTestSuite):
   def run_test_create_table_timestamp(self, unique_database, table_format):
     """Test that 'createTableTime' in the lineage graph are populated with 
valid value
        from HMS."""
-    query = "create table {0}.lineage_test_tbl_{1} primary key (int_col) 
stored as {1} " \
-            "as select int_col, bigint_col from functional.alltypes".format(
-                unique_database, table_format)
+    not_enforced = ""
+    if table_format == "iceberg":
+      not_enforced = " NOT ENFORCED"
+    query = "create table {0}.lineage_test_tbl_{1} primary key (int_col) {2} " 
\
+        "stored as {1} as select int_col, bigint_col from 
functional.alltypes".format(
+            unique_database, table_format, not_enforced)
     result = self.execute_query_expect_success(self.client, query)
     profile_query_id = re.search("Query \(id=(.*)\):", 
result.runtime_profile).group(1)
 

Reply via email to