This is an automated email from the ASF dual-hosted git repository. dbecker pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 73171cb7164573349bd53a996a51bb7058b778e0 Author: Gabor Kaszab <gaborkas...@cloudera.com> AuthorDate: Wed Feb 7 12:33:04 2024 +0100 IMPALA-12729: Allow creating primary keys for Iceberg tables There are writer engines that use Iceberg's identifier-field-ids from the Iceberg schema to identify the columns to be written into the equality delete files (Flink, NiFi). So far Impala wasn't able to populate this identifier-field-ids. This patch introduces the support for not enforced primary keys for Iceberg tables, where the primary key is going to be used for setting identifier-field-ids during Iceberg schema creation. Example syntax: CREATE TABLE ice_tbl ( i int NOT NULL, j int, s string NOT NULL primary key(i, s) not enforced) PARTITIONED BY SPEC (truncate(10, s)) STORED AS ICEBERG; There are some constraints with primary keys (PK) following the behavior of Flink: - Only NOT NULL columns can be in the PK. - PK is not allowed in the column definition level like 'i int NOT NULL PRIMARY KEY'. - If the table is partitioned then the partition columns have to be part of the PK. - Float and double columns are not allowed for the PK. - Not allowed to drop a column that is used as a PK. Testing: - New E2E tests added for different table creation scenarios. - Manual test to use Nifi for writing into a table with PK. Change-Id: I7bea787acdabd8cb04661f4ddb5c3309af0364a6 Reviewed-on: http://gerrit.cloudera.org:8080/21149 Reviewed-by: Daniel Becker <daniel.bec...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- fe/src/main/cup/sql-parser.cup | 6 +- .../java/org/apache/impala/analysis/ColumnDef.java | 11 ++ .../impala/analysis/CreateTableAsSelectStmt.java | 11 +- .../org/apache/impala/analysis/SlotDescriptor.java | 6 +- .../java/org/apache/impala/analysis/TableDef.java | 66 +++++++---- .../org/apache/impala/catalog/FeIcebergTable.java | 6 + .../org/apache/impala/catalog/IcebergColumn.java | 7 ++ .../impala/catalog/iceberg/IcebergCtasTarget.java | 11 +- .../catalog/iceberg/IcebergMetadataTable.java | 4 +- .../apache/impala/service/CatalogOpExecutor.java | 14 ++- .../impala/service/IcebergCatalogOpExecutor.java | 23 +++- .../apache/impala/util/IcebergSchemaConverter.java | 30 ++++- fe/src/main/jflex/sql-scanner.flex | 1 + .../org/apache/impala/analysis/AnalyzeDDLTest.java | 6 +- .../queries/QueryTest/iceberg-create.test | 85 +++++++++++++- .../queries/QueryTest/iceberg-metadata-tables.test | 126 ++++++++++----------- .../queries/QueryTest/iceberg-negative.test | 87 ++++++++++++++ tests/custom_cluster/test_lineage.py | 9 +- 18 files changed, 389 insertions(+), 120 deletions(-) diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup index fb1b9b25f..b05baac44 100755 --- a/fe/src/main/cup/sql-parser.cup +++ b/fe/src/main/cup/sql-parser.cup @@ -308,7 +308,7 @@ terminal KW_COMPUTE, KW_CONSTRAINT, KW_CONVERT, KW_COPY, KW_CREATE, KW_CROSS, KW_CUBE, KW_CURRENT, KW_DATA, KW_DATABASE, KW_DATABASES, KW_DATE, KW_DATETIME, KW_DECIMAL, KW_DEFAULT, KW_DELETE, KW_DELIMITED, KW_DESC, KW_DESCRIBE, KW_DISABLE, KW_DISTINCT, KW_DIV, KW_DOUBLE, - KW_DROP, KW_ELSE, KW_ENABLE, KW_ENCODING, KW_END, KW_ESCAPED, KW_EXCEPT, KW_EXECUTE, + KW_DROP, KW_ELSE, KW_ENABLE, KW_ENCODING, KW_END, KW_ENFORCED, KW_ESCAPED, KW_EXCEPT, KW_EXECUTE, KW_EXISTS, KW_EXPLAIN, KW_EXTENDED, KW_EXTERNAL, KW_FALSE, KW_FIELDS, KW_FILEFORMAT, KW_FILES, KW_FINALIZE_FN, KW_FIRST, KW_FLOAT, KW_FOLLOWING, KW_FOR, KW_FOREIGN, KW_FORMAT, KW_FORMATTED, KW_FROM, KW_FULL, KW_FUNCTION, KW_FUNCTIONS, KW_GRANT, @@ -1677,6 +1677,8 @@ primary_keys ::= {: RESULT = new Pair<List<String>, Boolean>(col_names, true); :} | KW_NON KW_UNIQUE KW_PRIMARY key_ident LPAREN ident_list:col_names RPAREN {: RESULT = new Pair<List<String>, Boolean>(col_names, false); :} + | KW_PRIMARY key_ident LPAREN ident_list:col_names RPAREN KW_NOT KW_ENFORCED + {: RESULT = new Pair<List<String>, Boolean>(col_names, false); :} ; rely_spec ::= @@ -4307,6 +4309,8 @@ word ::= {: RESULT = r.toString(); :} | KW_END:r {: RESULT = r.toString(); :} + | KW_ENFORCED:r + {: RESULT = r.toString(); :} | KW_ESCAPED:r {: RESULT = r.toString(); :} | KW_EXCEPT:r diff --git a/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java b/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java index 5ba145f30..b4e3e9b1d 100644 --- a/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java +++ b/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java @@ -181,6 +181,17 @@ public class ColumnDef { public boolean hasIcebergOptions() { return isNullabilitySet(); } + // Returns true if the column has options that are not supported for Iceberg tables. + public boolean hasIncompatibleIcebergOptions() { + return isPrimaryKey() || hasEncoding() || hasCompression() || hasDefaultValue() + || hasBlockSize(); + } + // Returns true if the column has options that are not supported for Kudu tables. + public boolean hasIncompatibleKuduOptions() { + // This always returns false as currently only 'isNullable' is a valid Iceberg + // option that is also valid for Kudu. + return false; + } public boolean hasEncoding() { return encodingVal_ != null; } public boolean hasCompression() { return compressionVal_ != null; } public boolean hasBlockSize() { return blockSize_ != null; } diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java index c6b33e644..87a62f1cf 100644 --- a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; import java.util.List; +import java.util.Set; import org.apache.impala.authorization.Privilege; import org.apache.impala.catalog.FeDb; @@ -39,6 +40,7 @@ import org.apache.impala.service.CatalogOpExecutor; import org.apache.impala.thrift.THdfsFileFormat; import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; /** * Represents a CREATE TABLE AS SELECT (CTAS) statement @@ -186,6 +188,8 @@ public class CreateTableAsSelectStmt extends StatementBase { // Add the columns from the select statement to the create statement. int colCnt = tmpQueryStmt.getColLabels().size(); + Set<String> hashedPrimaryKeyColNames = + Sets.newHashSet(createStmt_.getTblPrimaryKeyColumnNames()); for (int i = 0; i < colCnt; ++i) { ColumnDef colDef = new ColumnDef(tmpQueryStmt.getColLabels().get(i), null, Collections.<ColumnDef.Option, Object>emptyMap()); @@ -195,6 +199,11 @@ public class CreateTableAsSelectStmt extends StatementBase { "for column '%s'. Use cast() to explicitly specify the column type for " + "column '%s'.", colDef.getColName(), colDef.getColName())); } + if (createStmt_.getFileFormat() == THdfsFileFormat.ICEBERG && + hashedPrimaryKeyColNames.contains(colDef.getColName())) { + // Iceberg tables require NOT NULL column config for primary key columns. + colDef.setNullable(false); + } createStmt_.getColumnDefs().add(colDef); } createStmt_.analyze(analyzer); @@ -239,7 +248,7 @@ public class CreateTableAsSelectStmt extends StatementBase { partSpec = createStmt_.getIcebergPartitionSpecs().get(0); } tmpTable = new IcebergCtasTarget(db, msTbl, createStmt_.getColumnDefs(), - partSpec); + createStmt_.getTblPrimaryKeyColumnNames(), partSpec); } else if (HdfsFileFormat.isHdfsInputFormatClass(msTbl.getSd().getInputFormat())) { tmpTable = db.createFsCtasTarget(msTbl); } diff --git a/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java b/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java index 73ba4a6a9..939249c01 100644 --- a/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java +++ b/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java @@ -27,6 +27,7 @@ import java.util.Set; import org.apache.impala.catalog.Column; import org.apache.impala.catalog.ColumnStats; import org.apache.impala.catalog.FeKuduTable; +import org.apache.impala.catalog.IcebergColumn; import org.apache.impala.catalog.KuduColumn; import org.apache.impala.catalog.Type; import org.apache.impala.thrift.TSlotDescriptor; @@ -170,10 +171,13 @@ public class SlotDescriptor { type_ = path_.destType(); label_ = Joiner.on(".").join(path.getRawPath()); - // Set nullability, if this refers to a KuduColumn. + // Set nullability based on column type. if (path_.destColumn() instanceof KuduColumn) { KuduColumn kuduColumn = (KuduColumn)path_.destColumn(); isNullable_ = kuduColumn.isNullable(); + } else if (path_.destColumn() instanceof IcebergColumn) { + IcebergColumn icebergColumn = (IcebergColumn)path_.destColumn(); + isNullable_ = icebergColumn.isNullable(); } } diff --git a/fe/src/main/java/org/apache/impala/analysis/TableDef.java b/fe/src/main/java/org/apache/impala/analysis/TableDef.java index 428de0a0c..fce1039f6 100644 --- a/fe/src/main/java/org/apache/impala/analysis/TableDef.java +++ b/fe/src/main/java/org/apache/impala/analysis/TableDef.java @@ -57,6 +57,7 @@ import org.apache.thrift.TException; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; /** * Represents the table parameters in a CREATE TABLE statement. These parameters @@ -85,9 +86,12 @@ class TableDef { // mean no primary keys were specified as the columnDefs_ could contain primary keys. private final List<String> primaryKeyColNames_ = new ArrayList<>(); - // If true, the primary key is unique. If not, an auto-incrementing column will be - // added automatically by Kudu engine. This extra key column helps produce a unique - // composite primary key (primary keys + auto-incrementing construct). + // If true, the primary key is unique. If not, and the table is a Kudu table then an + // auto-incrementing column will be added automatically by Kudu engine. This extra key + // column helps produce a unique composite primary key (primary keys + + // auto-incrementing construct). + // This is also used for Iceberg table and set to false if "NOT ENFORCED" is provided + // for the primary key. private boolean isPrimaryKeyUnique_; // If true, the table's data will be preserved if dropped. @@ -478,28 +482,20 @@ class TableDef { } /** - * Kudu and Iceberg table has their own column option, this function will return false - * if we use column option incorrectly, such as use primary key for Parquet format. + * Kudu and Iceberg tables have their own column options, this function will return + * false if we use column options incorrectly, e.g. primary key column option for an + * Iceberg table. */ private boolean analyzeColumnOption(ColumnDef columnDef) { - boolean flag = true; - // Kudu option and Iceberg option have overlap - if (columnDef.hasKuduOptions() && columnDef.hasIcebergOptions()) { - if (!isKuduTable() && !isIcebergTable()) { - flag = false; - } - } else if (columnDef.hasKuduOptions()) { - if (!isKuduTable()) { - flag = false; - } - } else if (columnDef.hasIcebergOptions()) { - // Currently Iceberg option only contains 'nullable', so we won't run into this - // situation. - if (!isIcebergTable()) { - flag = false; - } - } - return flag; + if (isKuduTable()) { + if (columnDef.hasIncompatibleKuduOptions()) return false; + } else if (isIcebergTable()) { + if (columnDef.hasIncompatibleIcebergOptions()) return false; + } else if (columnDef.hasKuduOptions() || columnDef.hasIcebergOptions()) { + // If the table is neither Kudu or Iceberg but has some incompatible column options. + return false; + } + return true; } /** @@ -574,9 +570,29 @@ class TableDef { "(col1, col2, ...) syntax at the end of the column definition.", primaryKeyString, primaryKeyString, primaryKeyString)); } else if (!primaryKeyColNames_.isEmpty() && !isPrimaryKeyUnique() - && !isKuduTable()) { - throw new AnalysisException(primaryKeyString + " is only supported for Kudu."); + && !isKuduTable() && !isIcebergTable()) { + throw new AnalysisException(primaryKeyString + + " is only supported for Kudu and Iceberg."); + } + + if (isIcebergTable() && isPrimaryKeyUnique_) { + throw new AnalysisException( + "Iceberg tables only support NOT ENFORCED primary keys."); + } + + Set<String> hashedPKColNames = Sets.newHashSet(primaryKeyColNames_); + List<IcebergPartitionSpec> icebergPartitionSpecs = getIcebergPartitionSpecs(); + if (!icebergPartitionSpecs.isEmpty()) { + Preconditions.checkState(icebergPartitionSpecs.size() == 1); + IcebergPartitionSpec partSpec = icebergPartitionSpecs.get(0); + for (IcebergPartitionField partField : partSpec.getIcebergPartitionFields()) { + if (!hashedPKColNames.contains(partField.getFieldName())) { + throw new AnalysisException("Partition columns have to be part of the " + + "primary key for Iceberg tables."); + } + } } + Map<String, ColumnDef> colDefsByColName = ColumnDef.mapByColumnNames(columnDefs_); int keySeq = 1; String constraintName = null; diff --git a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java index 633fffc9d..61768cd5c 100644 --- a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java @@ -92,6 +92,7 @@ import org.apache.impala.util.IcebergSchemaConverter; import org.apache.impala.util.IcebergUtil; import org.apache.impala.util.ListMap; import org.apache.impala.util.TResultRowBuilder; +import org.apache.thrift.TException; import org.json.simple.JSONValue; import org.slf4j.Logger; @@ -199,6 +200,11 @@ public interface FeIcebergTable extends FeFsTable { return getIcebergApiTable().schema(); } + @Override + default List<String> getPrimaryKeyColumnNames() throws TException { + return Lists.newArrayList(getIcebergSchema().identifierFieldNames()); + } + @Override default boolean isCacheable() { return getFeFsTable().isCacheable(); diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergColumn.java b/fe/src/main/java/org/apache/impala/catalog/IcebergColumn.java index 1b6f953d3..a5483d1b8 100644 --- a/fe/src/main/java/org/apache/impala/catalog/IcebergColumn.java +++ b/fe/src/main/java/org/apache/impala/catalog/IcebergColumn.java @@ -48,6 +48,13 @@ public class IcebergColumn extends Column { isNullable_ = isNullable; } + public static IcebergColumn cloneWithNullability(IcebergColumn source, + boolean isNullable) { + return new IcebergColumn(source.name_, source.type_, source.comment_, + source.position_, source.fieldId_, source.fieldMapKeyId_, source.fieldMapKeyId_, + isNullable); + } + public int getFieldId() { return fieldId_; } diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java index 5ceeefd07..5cb4d163f 100644 --- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java +++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java @@ -89,11 +89,11 @@ public class IcebergCtasTarget extends CtasTargetTable implements FeIcebergTable private HdfsStorageDescriptor hdfsSd_; public IcebergCtasTarget(FeDb db, org.apache.hadoop.hive.metastore.api.Table msTbl, - List<ColumnDef> columnDefs, IcebergPartitionSpec partSpec) - throws CatalogException, ImpalaRuntimeException { + List<ColumnDef> columnDefs, List<String> primaryKeyNames, + IcebergPartitionSpec partSpec) throws CatalogException, ImpalaRuntimeException { super(msTbl, db, msTbl.getTableName(), msTbl.getOwner()); createFsTable(db, msTbl); - createIcebergSchema(columnDefs); + createIcebergSchema(columnDefs, primaryKeyNames); createPartitionSpec(partSpec); icebergCatalog_ = IcebergUtil.getTIcebergCatalog(msTbl); setLocations(); @@ -105,13 +105,14 @@ public class IcebergCtasTarget extends CtasTargetTable implements FeIcebergTable hdfsSd_ = HdfsStorageDescriptor.fromStorageDescriptor(name_, msTable_.getSd()); } - private void createIcebergSchema(List<ColumnDef> columnDefs) throws CatalogException { + private void createIcebergSchema(List<ColumnDef> columnDefs, + List<String> primaryKeyNames) throws CatalogException { List<TColumn> tcols = new ArrayList<>(); for (ColumnDef col : columnDefs) { tcols.add(col.toThrift()); } try { - iceSchema_ = IcebergSchemaConverter.genIcebergSchema(tcols); + iceSchema_ = IcebergSchemaConverter.genIcebergSchema(tcols, primaryKeyNames); // In genIcebergSchema() we did our best to assign correct field ids to columns, // but to be sure, let's use Iceberg's API function to assign field ids. iceSchema_ = TypeUtil.assignIncreasingFreshIds(iceSchema_); diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergMetadataTable.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergMetadataTable.java index 877b31a46..3d417eef9 100644 --- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergMetadataTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergMetadataTable.java @@ -31,6 +31,7 @@ import org.apache.impala.catalog.Column; import org.apache.impala.catalog.FeCatalogUtils; import org.apache.impala.catalog.FeIcebergTable; import org.apache.impala.catalog.FeTable; +import org.apache.impala.catalog.IcebergColumn; import org.apache.impala.catalog.VirtualTable; import org.apache.impala.common.ImpalaRuntimeException; import org.apache.impala.thrift.TColumnDescriptor; @@ -73,7 +74,8 @@ public class IcebergMetadataTable extends VirtualTable { metadataTableSchema)) { LOG.trace("Adding column: \"{}\" with type: \"{}\" to metadata table.", col.getName(), col.getType()); - addColumn(col); + addColumn(IcebergColumn.cloneWithNullability( + (IcebergColumn)col, true /*isNullable*/)); } } diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index e8be33988..8fcb54d1b 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -3560,7 +3560,8 @@ public class CatalogOpExecutor { } else if (IcebergTable.isIcebergTable(tbl)) { return createIcebergTable(tbl, wantMinimalResult, response, catalogTimeline, params.if_not_exists, params.getColumns(), params.getPartition_spec(), - params.getTable_properties(), params.getComment()); + params.getPrimary_key_column_names(), params.getTable_properties(), + params.getComment()); } Preconditions.checkState(params.getColumns().size() > 0, "Empty column list given as argument to Catalog.createTable"); @@ -3934,7 +3935,8 @@ public class CatalogOpExecutor { private boolean createIcebergTable(org.apache.hadoop.hive.metastore.api.Table newTable, boolean wantMinimalResult, TDdlExecResponse response, EventSequence catalogTimeline, boolean ifNotExists, List<TColumn> columns, TIcebergPartitionSpec partitionSpec, - Map<String, String> tableProperties, String tblComment) throws ImpalaException { + List<String> primaryKeyColumnNames, Map<String, String> tableProperties, + String tblComment) throws ImpalaException { Preconditions.checkState(IcebergTable.isIcebergTable(newTable)); acquireMetastoreDdlLock(catalogTimeline); @@ -3969,7 +3971,8 @@ public class CatalogOpExecutor { } String tableLoc = IcebergCatalogOpExecutor.createTable(catalog, IcebergUtil.getIcebergTableIdentifier(newTable), location, columns, - partitionSpec, newTable.getOwner(), tableProperties).location(); + partitionSpec, primaryKeyColumnNames, newTable.getOwner(), + tableProperties).location(); newTable.getSd().setLocation(tableLoc); catalogTimeline.markEvent(CREATED_ICEBERG_TABLE + catalog.name()); } else { @@ -4196,8 +4199,9 @@ public class CatalogOpExecutor { TIcebergPartitionSpec partitionSpec = srcIceTable.getDefaultPartitionSpec() .toThrift(); createIcebergTable(tbl, wantMinimalResult, response, catalogTimeline, - params.if_not_exists, columns, partitionSpec, tableProperties, - params.getComment()); + params.if_not_exists, columns, partitionSpec, + Lists.newArrayList(srcIceTable.getIcebergSchema().identifierFieldNames()), + tableProperties, params.getComment()); } else if (srcTable instanceof KuduTable && KuduTable.isKuduTable(tbl)) { TCreateTableParams createTableParams = extractKuduCreateTableParams(params, tblName, (KuduTable) srcTable, tbl); diff --git a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java index f126bf16e..404050471 100644 --- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java @@ -18,10 +18,10 @@ package org.apache.impala.service; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.Collections; import org.apache.iceberg.AppendFiles; @@ -71,6 +71,7 @@ import org.apache.impala.thrift.TIcebergPartitionSpec; import org.apache.impala.thrift.TRollbackType; import org.apache.impala.util.IcebergSchemaConverter; import org.apache.impala.util.IcebergUtil; +import org.apache.thrift.TException; import com.google.common.base.Preconditions; import org.slf4j.Logger; @@ -90,9 +91,10 @@ public class IcebergCatalogOpExecutor { */ public static Table createTable(TIcebergCatalog catalog, TableIdentifier identifier, String location, List<TColumn> columns, TIcebergPartitionSpec partitionSpec, - String owner, Map<String, String> tableProperties) throws ImpalaRuntimeException { + List<String> primaryKeyColumnNames, String owner, + Map<String, String> tableProperties) throws ImpalaRuntimeException { // Each table id increase from zero - Schema schema = createIcebergSchema(columns); + Schema schema = createIcebergSchema(columns, primaryKeyColumnNames); PartitionSpec spec = IcebergUtil.createIcebergPartition(schema, partitionSpec); IcebergCatalog icebergCatalog = IcebergUtil.getIcebergCatalog(catalog, location); if (icebergCatalog instanceof IcebergHiveCatalog) { @@ -185,6 +187,15 @@ public class IcebergCatalogOpExecutor { public static void alterTableSetPartitionSpec(FeIcebergTable feTable, TIcebergPartitionSpec partSpec, Transaction transaction) throws ImpalaRuntimeException { + try { + if (!feTable.getPrimaryKeyColumnNames().isEmpty()) { + throw new ImpalaRuntimeException("Not allowed to do partition evolution on " + + "Iceberg tables with primary keys."); + } + } catch (TException tEx) { + throw new ImpalaRuntimeException(tEx.getMessage()); + } + BaseTable iceTable = (BaseTable)feTable.getIcebergApiTable(); UpdatePartitionSpec updatePartitionSpec = transaction.updateSpec(); iceTable.spec().fields().forEach(partitionField -> updatePartitionSpec.removeField( @@ -297,9 +308,9 @@ public class IcebergCatalogOpExecutor { /** * Build iceberg schema by parameters. */ - private static Schema createIcebergSchema(List<TColumn> columns) - throws ImpalaRuntimeException { - return IcebergSchemaConverter.genIcebergSchema(columns); + private static Schema createIcebergSchema(List<TColumn> columns, + List<String> primaryKeyColumnNames) throws ImpalaRuntimeException { + return IcebergSchemaConverter.genIcebergSchema(columns, primaryKeyColumnNames); } /** diff --git a/fe/src/main/java/org/apache/impala/util/IcebergSchemaConverter.java b/fe/src/main/java/org/apache/impala/util/IcebergSchemaConverter.java index 78d8b4458..ecee330ab 100644 --- a/fe/src/main/java/org/apache/impala/util/IcebergSchemaConverter.java +++ b/fe/src/main/java/org/apache/impala/util/IcebergSchemaConverter.java @@ -18,7 +18,11 @@ package org.apache.impala.util; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -168,16 +172,32 @@ public class IcebergSchemaConverter { /** * Generates Iceberg schema from given columns. It also assigns a unique 'field id' for - * each schema element, although Iceberg will reassign the ids. + * each schema element, although Iceberg will reassign the ids. 'primaryKeyColumnNames' + * are used for populating 'identifier-field-ids' in the Schema. */ - public static Schema genIcebergSchema(List<TColumn> columns) - throws ImpalaRuntimeException { + public static Schema genIcebergSchema(List<TColumn> columns, + List<String> primaryKeyColumnNames) throws ImpalaRuntimeException { iThreadLocal.set(1); List<Types.NestedField> fields = new ArrayList<Types.NestedField>(); + Map<String, Integer> colNameToFieldId = new HashMap<>(); for (TColumn column : columns) { - fields.add(createIcebergField(column)); + Types.NestedField icebergField = createIcebergField(column); + fields.add(icebergField); + colNameToFieldId.put(icebergField.name(), icebergField.fieldId()); + } + + if (primaryKeyColumnNames == null || primaryKeyColumnNames.isEmpty()) { + return new Schema(fields); + } + + Set<Integer> identifierFieldIds = new HashSet<>(); + for (String pkColName : primaryKeyColumnNames) { + if (!colNameToFieldId.containsKey(pkColName)) { + throw new ImpalaRuntimeException("Invalid primary key column name: " + pkColName); + } + identifierFieldIds.add(colNameToFieldId.get(pkColName)); } - return new Schema(fields); + return new Schema(fields, identifierFieldIds); } /** diff --git a/fe/src/main/jflex/sql-scanner.flex b/fe/src/main/jflex/sql-scanner.flex index a679897b8..50ba0b52a 100644 --- a/fe/src/main/jflex/sql-scanner.flex +++ b/fe/src/main/jflex/sql-scanner.flex @@ -125,6 +125,7 @@ import org.apache.impala.thrift.TReservedWordsVersion; keywordMap.put("enable", SqlParserSymbols.KW_ENABLE); keywordMap.put("encoding", SqlParserSymbols.KW_ENCODING); keywordMap.put("end", SqlParserSymbols.KW_END); + keywordMap.put("enforced", SqlParserSymbols.KW_ENFORCED); keywordMap.put("escaped", SqlParserSymbols.KW_ESCAPED); keywordMap.put("except", SqlParserSymbols.KW_EXCEPT); keywordMap.put("exists", SqlParserSymbols.KW_EXISTS); diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java index a1ea8cd9a..c36492456 100644 --- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java @@ -2536,9 +2536,9 @@ public class AnalyzeDDLTest extends FrontendTestBase { " stored by kudu as select id, bool_col, tinyint_col, smallint_col, " + "int_col, bigint_col, float_col, double_col, date_string_col, string_col " + "from functional.alltypestiny"); - AnalyzesOk("create table t primary key (id) stored by iceberg as select id, " + - "bool_col, int_col, float_col, double_col, date_string_col, string_col " + - "from functional.alltypestiny"); + AnalyzesOk("create table t primary key (id) not enforced " + + "stored by iceberg as select id, bool_col, int_col, float_col, double_col, " + + "date_string_col, string_col from functional.alltypestiny"); // IMPALA-9822 Row Format Delimited is valid only for Text Files String[] fileFormats = {"TEXTFILE", "PARQUET", "ICEBERG"}; diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-create.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-create.test index c8187fd81..23b834407 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-create.test +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-create.test @@ -615,4 +615,87 @@ describe formatted ice_hadoop_tbl; '','external.table.purge','TRUE ' ---- TYPES string, string, string -==== \ No newline at end of file +==== +---- QUERY +# Create an Iceberg table with PKs and check if 'identifier-field-ids' are populated. +create table ice_tbl_with_pk + (i int not null, s string not null, d date not null, primary key(i, d) not enforced) + stored as iceberg; +describe formatted ice_tbl_with_pk; +---- RESULTS: VERIFY_IS_SUBSET +row_regex:'','current-schema.*','.*identifier-field-ids\\\\":\[1,3\].*' +---- TYPES +string, string, string +==== +---- QUERY +# Similar as above, but the table is also partitioned. +create table ice_tbl_with_pk_partitioned + (i int not null, s string not null, d date, primary key(i, s) not enforced) + partitioned by spec (s) + stored as iceberg; +describe formatted ice_tbl_with_pk_partitioned; +---- RESULTS: VERIFY_IS_SUBSET +row_regex:'','current-schema.*','.*identifier-field-ids\\\\":\[1,2\].*' +---- TYPES +string, string, string +==== +---- QUERY +# Primary key on partition columns with partition transform. +create table ice_tbl_with_pk_partition_transform + (i int not null, s string not null, d date not null, primary key(s, d) not enforced) + partitioned by spec (day(d), truncate(3,s), bucket(10, s)) + stored as iceberg; +describe formatted ice_tbl_with_pk_partition_transform; +---- RESULTS: VERIFY_IS_SUBSET +row_regex:'','current-schema.*','.*identifier-field-ids\\\\":\[2,3\].*' +---- TYPES +string, string, string +==== +---- QUERY +# When source table has primary keys the CTAS by default won't use it for the target +# table. +create table ctas_pk_in_source_not_used + stored as iceberg + as select * from ice_tbl_with_pk; +describe formatted ctas_pk_in_source_not_used; +---- RESULTS: VERIFY_IS_SUBSET +row_regex:'.*','current-schema.*','.*' +---- RESULTS: VERIFY_IS_NOT_IN +row_regex:'.*','current-schema.*','.*identifier-field-ids.*' +---- TYPES +STRING,STRING,STRING +==== +---- QUERY +# Primary key provided to a CTAS statement. +create table ctas_pk_unpartitioned + primary key(d) not enforced + stored as iceberg + as select * from ice_tbl_with_pk; +describe formatted ctas_pk_unpartitioned; +---- RESULTS: VERIFY_IS_SUBSET +row_regex:'.*','current-schema.*','.*identifier-field-ids\\\\":\[3\].*' +---- TYPES +STRING,STRING,STRING +==== +---- QUERY +# Similar as above but the table is partitioned. +create table ctas_pk_partitioned + primary key(s, d) not enforced + partitioned by spec (d) + stored as iceberg + as select * from ice_tbl_with_pk; +describe formatted ctas_pk_partitioned; +---- RESULTS: VERIFY_IS_SUBSET +row_regex:'.*','current-schema.*','.*identifier-field-ids\\\\":\[2,3\].*' +---- TYPES +STRING,STRING,STRING +==== +---- QUERY +# Create table like inherits the 'identifier-field-ids' from the source table. +create table ice_like_pk like ice_tbl_with_pk_partitioned; +describe formatted ice_like_pk; +---- RESULTS: VERIFY_IS_SUBSET +row_regex:'.*','current-schema.*','.*identifier-field-ids\\\\":\[1,2\].*' +---- TYPES +STRING,STRING,STRING +==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test index 9918adcd7..b1cb6bad0 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test @@ -718,8 +718,8 @@ STRING ---- QUERY describe functional_parquet.iceberg_query_metadata.snapshots; ---- RESULTS -'committed_at','timestamp','','false' -'snapshot_id','bigint','','false' +'committed_at','timestamp','','true' +'snapshot_id','bigint','','true' 'parent_id','bigint','','true' 'operation','string','','true' 'manifest_list','string','','true' @@ -731,11 +731,11 @@ STRING,STRING,STRING,STRING describe functional_parquet.iceberg_query_metadata.`files`; ---- RESULTS 'content','int','Contents of the file: 0=data, 1=position deletes, 2=equality deletes','true' -'file_path','string','Location URI with FS scheme','false' -'file_format','string','File format name: avro, orc, or parquet','false' +'file_path','string','Location URI with FS scheme','true' +'file_format','string','File format name: avro, orc, or parquet','true' 'spec_id','int','Partition spec ID','true' -'record_count','bigint','Number of records in the file','false' -'file_size_in_bytes','bigint','Total file size in bytes','false' +'record_count','bigint','Number of records in the file','true' +'file_size_in_bytes','bigint','Total file size in bytes','true' 'column_sizes','map<int,bigint>','Map of column id to total size on disk','true' 'value_counts','map<int,bigint>','Map of column id to total count, including null and NaN','true' 'null_value_counts','map<int,bigint>','Map of column id to null value count','true' @@ -754,11 +754,11 @@ STRING,STRING,STRING,STRING describe functional_parquet.iceberg_query_metadata.data_files; ---- RESULTS 'content','int','Contents of the file: 0=data, 1=position deletes, 2=equality deletes','true' -'file_path','string','Location URI with FS scheme','false' -'file_format','string','File format name: avro, orc, or parquet','false' +'file_path','string','Location URI with FS scheme','true' +'file_format','string','File format name: avro, orc, or parquet','true' 'spec_id','int','Partition spec ID','true' -'record_count','bigint','Number of records in the file','false' -'file_size_in_bytes','bigint','Total file size in bytes','false' +'record_count','bigint','Number of records in the file','true' +'file_size_in_bytes','bigint','Total file size in bytes','true' 'column_sizes','map<int,bigint>','Map of column id to total size on disk','true' 'value_counts','map<int,bigint>','Map of column id to total count, including null and NaN','true' 'null_value_counts','map<int,bigint>','Map of column id to null value count','true' @@ -777,11 +777,11 @@ STRING,STRING,STRING,STRING describe functional_parquet.iceberg_query_metadata.delete_files; ---- RESULTS 'content','int','Contents of the file: 0=data, 1=position deletes, 2=equality deletes','true' -'file_path','string','Location URI with FS scheme','false' -'file_format','string','File format name: avro, orc, or parquet','false' +'file_path','string','Location URI with FS scheme','true' +'file_format','string','File format name: avro, orc, or parquet','true' 'spec_id','int','Partition spec ID','true' -'record_count','bigint','Number of records in the file','false' -'file_size_in_bytes','bigint','Total file size in bytes','false' +'record_count','bigint','Number of records in the file','true' +'file_size_in_bytes','bigint','Total file size in bytes','true' 'column_sizes','map<int,bigint>','Map of column id to total size on disk','true' 'value_counts','map<int,bigint>','Map of column id to total count, including null and NaN','true' 'null_value_counts','map<int,bigint>','Map of column id to null value count','true' @@ -799,18 +799,18 @@ STRING,STRING,STRING,STRING ---- QUERY describe functional_parquet.iceberg_query_metadata.history; ---- RESULTS -'made_current_at','timestamp','','false' -'snapshot_id','bigint','','false' +'made_current_at','timestamp','','true' +'snapshot_id','bigint','','true' 'parent_id','bigint','','true' -'is_current_ancestor','boolean','','false' +'is_current_ancestor','boolean','','true' ---- TYPES STRING,STRING,STRING,STRING ==== ---- QUERY describe functional_parquet.iceberg_query_metadata.metadata_log_entries; ---- RESULTS -'timestamp','timestamp','','false' -'file','string','','false' +'timestamp','timestamp','','true' +'file','string','','true' 'latest_snapshot_id','bigint','','true' 'latest_schema_id','int','','true' 'latest_sequence_number','bigint','','true' @@ -820,8 +820,8 @@ STRING,STRING,STRING,STRING ---- QUERY describe functional_parquet.iceberg_query_metadata.snapshots; ---- RESULTS -'committed_at','timestamp','','false' -'snapshot_id','bigint','','false' +'committed_at','timestamp','','true' +'snapshot_id','bigint','','true' 'parent_id','bigint','','true' 'operation','string','','true' 'manifest_list','string','','true' @@ -832,9 +832,9 @@ STRING,STRING,STRING,STRING ---- QUERY describe functional_parquet.iceberg_query_metadata.refs; ---- RESULTS -'name','string','','false' -'type','string','','false' -'snapshot_id','bigint','','false' +'name','string','','true' +'type','string','','true' +'snapshot_id','bigint','','true' 'max_reference_age_in_ms','bigint','','true' 'min_snapshots_to_keep','int','','true' 'max_snapshot_age_in_ms','bigint','','true' @@ -844,30 +844,30 @@ STRING,STRING,STRING,STRING ---- QUERY describe functional_parquet.iceberg_query_metadata.manifests; ---- RESULTS -'content','int','','false' -'path','string','','false' -'length','bigint','','false' -'partition_spec_id','int','','false' -'added_snapshot_id','bigint','','false' -'added_data_files_count','int','','false' -'existing_data_files_count','int','','false' -'deleted_data_files_count','int','','false' -'added_delete_files_count','int','','false' -'existing_delete_files_count','int','','false' -'deleted_delete_files_count','int','','false' -'partition_summaries','array<struct<\n contains_null:boolean,\n contains_nan:boolean,\n lower_bound:string,\n upper_bound:string\n>>','','false' +'content','int','','true' +'path','string','','true' +'length','bigint','','true' +'partition_spec_id','int','','true' +'added_snapshot_id','bigint','','true' +'added_data_files_count','int','','true' +'existing_data_files_count','int','','true' +'deleted_data_files_count','int','','true' +'added_delete_files_count','int','','true' +'existing_delete_files_count','int','','true' +'deleted_delete_files_count','int','','true' +'partition_summaries','array<struct<\n contains_null:boolean,\n contains_nan:boolean,\n lower_bound:string,\n upper_bound:string\n>>','','true' ---- TYPES STRING,STRING,STRING,STRING ==== ---- QUERY describe functional_parquet.iceberg_query_metadata.`partitions`; ---- RESULTS -'record_count','bigint','Count of records in data files','false' -'file_count','int','Count of data files','false' -'position_delete_record_count','bigint','Count of records in position delete files','false' -'position_delete_file_count','int','Count of position delete files','false' -'equality_delete_record_count','bigint','Count of records in equality delete files','false' -'equality_delete_file_count','int','Count of equality delete files','false' +'record_count','bigint','Count of records in data files','true' +'file_count','int','Count of data files','true' +'position_delete_record_count','bigint','Count of records in position delete files','true' +'position_delete_file_count','int','Count of position delete files','true' +'equality_delete_record_count','bigint','Count of records in equality delete files','true' +'equality_delete_file_count','int','Count of equality delete files','true' ---- TYPES STRING,STRING,STRING,STRING ==== @@ -875,11 +875,11 @@ STRING,STRING,STRING,STRING describe functional_parquet.iceberg_query_metadata.all_data_files; ---- RESULTS 'content','int','Contents of the file: 0=data, 1=position deletes, 2=equality deletes','true' -'file_path','string','Location URI with FS scheme','false' -'file_format','string','File format name: avro, orc, or parquet','false' +'file_path','string','Location URI with FS scheme','true' +'file_format','string','File format name: avro, orc, or parquet','true' 'spec_id','int','Partition spec ID','true' -'record_count','bigint','Number of records in the file','false' -'file_size_in_bytes','bigint','Total file size in bytes','false' +'record_count','bigint','Number of records in the file','true' +'file_size_in_bytes','bigint','Total file size in bytes','true' 'column_sizes','map<int,bigint>','Map of column id to total size on disk','true' 'value_counts','map<int,bigint>','Map of column id to total count, including null and NaN','true' 'null_value_counts','map<int,bigint>','Map of column id to null value count','true' @@ -898,11 +898,11 @@ STRING,STRING,STRING,STRING describe functional_parquet.iceberg_query_metadata.all_delete_files; ---- RESULTS 'content','int','Contents of the file: 0=data, 1=position deletes, 2=equality deletes','true' -'file_path','string','Location URI with FS scheme','false' -'file_format','string','File format name: avro, orc, or parquet','false' +'file_path','string','Location URI with FS scheme','true' +'file_format','string','File format name: avro, orc, or parquet','true' 'spec_id','int','Partition spec ID','true' -'record_count','bigint','Number of records in the file','false' -'file_size_in_bytes','bigint','Total file size in bytes','false' +'record_count','bigint','Number of records in the file','true' +'file_size_in_bytes','bigint','Total file size in bytes','true' 'column_sizes','map<int,bigint>','Map of column id to total size on disk','true' 'value_counts','map<int,bigint>','Map of column id to total count, including null and NaN','true' 'null_value_counts','map<int,bigint>','Map of column id to null value count','true' @@ -921,11 +921,11 @@ STRING,STRING,STRING,STRING describe functional_parquet.iceberg_query_metadata.all_files; ---- RESULTS 'content','int','Contents of the file: 0=data, 1=position deletes, 2=equality deletes','true' -'file_path','string','Location URI with FS scheme','false' -'file_format','string','File format name: avro, orc, or parquet','false' +'file_path','string','Location URI with FS scheme','true' +'file_format','string','File format name: avro, orc, or parquet','true' 'spec_id','int','Partition spec ID','true' -'record_count','bigint','Number of records in the file','false' -'file_size_in_bytes','bigint','Total file size in bytes','false' +'record_count','bigint','Number of records in the file','true' +'file_size_in_bytes','bigint','Total file size in bytes','true' 'column_sizes','map<int,bigint>','Map of column id to total size on disk','true' 'value_counts','map<int,bigint>','Map of column id to total count, including null and NaN','true' 'null_value_counts','map<int,bigint>','Map of column id to null value count','true' @@ -943,30 +943,30 @@ STRING,STRING,STRING,STRING ---- QUERY describe functional_parquet.iceberg_query_metadata.all_manifests; ---- RESULTS -'content','int','','false' -'path','string','','false' -'length','bigint','','false' +'content','int','','true' +'path','string','','true' +'length','bigint','','true' 'partition_spec_id','int','','true' 'added_snapshot_id','bigint','','true' 'added_data_files_count','int','','true' 'existing_data_files_count','int','','true' 'deleted_data_files_count','int','','true' -'added_delete_files_count','int','','false' -'existing_delete_files_count','int','','false' -'deleted_delete_files_count','int','','false' +'added_delete_files_count','int','','true' +'existing_delete_files_count','int','','true' +'deleted_delete_files_count','int','','true' 'partition_summaries','array<struct<\n contains_null:boolean,\n contains_nan:boolean,\n lower_bound:string,\n upper_bound:string\n>>','','true' -'reference_snapshot_id','bigint','','false' +'reference_snapshot_id','bigint','','true' ---- TYPES STRING,STRING,STRING,STRING ==== ---- QUERY describe functional_parquet.iceberg_query_metadata.all_entries; ---- RESULTS -'status','int','','false' +'status','int','','true' 'snapshot_id','bigint','','true' 'sequence_number','bigint','','true' 'file_sequence_number','bigint','','true' -'data_file','struct<\n content:int comment ''contents of the file: 0=data, 1=position deletes, 2=equality deletes'',\n file_path:string comment ''location uri with fs scheme'',\n file_format:string comment ''file format name: avro, orc, or parquet'',\n spec_id:int comment ''partition spec id'',\n record_count:bigint comment ''number of records in the file'',\n file_size_in_bytes:bigint comment ''total file size in bytes'',\n column_sizes:map<int,bigint> comment ''map of column id [...] +'data_file','struct<\n content:int comment ''contents of the file: 0=data, 1=position deletes, 2=equality deletes'',\n file_path:string comment ''location uri with fs scheme'',\n file_format:string comment ''file format name: avro, orc, or parquet'',\n spec_id:int comment ''partition spec id'',\n record_count:bigint comment ''number of records in the file'',\n file_size_in_bytes:bigint comment ''total file size in bytes'',\n column_sizes:map<int,bigint> comment ''map of column id [...] 'readable_metrics','struct<\n i:struct<\n column_size:bigint comment ''total size on disk'',\n value_count:bigint comment ''total count, including null and nan'',\n null_value_count:bigint comment ''null value count'',\n nan_value_count:bigint comment ''nan value count'',\n lower_bound:int comment ''lower bound'',\n upper_bound:int comment ''upper bound''\n > comment ''metrics for column i''\n>','Column metrics in readable form','true' ---- TYPES STRING,STRING,STRING,STRING diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test index 7fca94d3d..96d0b78ea 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test @@ -854,3 +854,90 @@ alter table functional_parquet.iceberg_v2_delete_equality_partitioned drop colum ---- CATCH Failed to ALTER table 'iceberg_v2_delete_equality_partitioned': Cannot delete identifier field 2: s: required string. ==== +---- QUERY +create table ice_pk_column_def (i int not null primary key) stored as iceberg; +---- CATCH +AnalysisException: Unsupported column options for file format 'ICEBERG': 'i INT PRIMARY KEY NOT NULL' +==== +---- QUERY +create table ice_pk_column_def (i int primary key) stored as iceberg; +---- CATCH +AnalysisException: Unsupported column options for file format 'ICEBERG': 'i INT PRIMARY KEY' +==== +---- QUERY +create table ice_tbl_with_pk (i int not null, primary key(i)) stored as iceberg; +---- CATCH +AnalysisException: Iceberg tables only support NOT ENFORCED primary keys. +==== +---- QUERY +create table ice_tbl_with_pk (i int, j int, primary key(i) not enforced) stored as iceberg; +---- CATCH +Cannot add field i as an identifier field: not a required field +==== +---- QUERY +create table ice_tbl_with_pk (i int not null, j int, primary key(i, j) not enforced) + partitioned by spec (j) + stored as iceberg; +---- CATCH +Cannot add field j as an identifier field: not a required field +==== +---- QUERY +create table ice_tbl_float_pk (f float not null, s string not null, d date not null, primary key(f) not enforced) stored as iceberg; +---- CATCH +IllegalArgumentException: Cannot add field f as an identifier field: must not be float or double field +==== +---- QUERY +create table ice_tbl_double_pk (db double not null, s string not null, d date not null, primary key(db) not enforced) stored as iceberg; +---- CATCH +IllegalArgumentException: Cannot add field db as an identifier field: must not be float or double field +==== +---- QUERY +create table ice_tbl_with_pk (i int, j int, primary key(i) not enforced) + partitioned by spec (j) + stored as iceberg; +---- CATCH +AnalysisException: Partition columns have to be part of the primary key for Iceberg tables. +==== +---- QUERY +create table ice_tbl_with_pk (i int not null, j int not null, k int, primary key(i, j) not enforced) + partitioned by spec (j, k) + stored as iceberg; +---- CATCH +AnalysisException: Partition columns have to be part of the primary key for Iceberg tables. +==== +---- QUERY +create table ice_tbl_with_pk_partition_transform + (i int not null, s string not null, d date not null, primary key(s) not enforced) + partitioned by spec (day(d), truncate(3,s), bucket(10, s)) + stored as iceberg; +---- CATCH +AnalysisException: Partition columns have to be part of the primary key for Iceberg tables. +==== +---- QUERY +create table ice_tbl_with_pk (i int not null, j int not null, primary key(i) not enforced) stored as iceberg; +alter table ice_tbl_with_pk drop column i; +---- CATCH +Cannot delete identifier field 1: i: required int +==== +---- QUERY +create table ctas_pk + primary key(int_col) + stored as iceberg + as select * from functional_parquet.alltypes; +---- CATCH +AnalysisException: Iceberg tables only support NOT ENFORCED primary keys. +==== +---- QUERY +create table ctas_pk + primary key(int_col) not enforced + partitioned by spec (string_col) + stored as iceberg + as select * from functional_parquet.alltypes; +---- CATCH +AnalysisException: Partition columns have to be part of the primary key for Iceberg tables. +==== +---- QUERY +alter table ice_tbl_with_pk set partition spec (j) +---- CATCH +Not allowed to do partition evolution on Iceberg tables with primary keys. +==== diff --git a/tests/custom_cluster/test_lineage.py b/tests/custom_cluster/test_lineage.py index a670b428f..42e4cbee8 100644 --- a/tests/custom_cluster/test_lineage.py +++ b/tests/custom_cluster/test_lineage.py @@ -98,9 +98,12 @@ class TestLineage(CustomClusterTestSuite): def run_test_create_table_timestamp(self, unique_database, table_format): """Test that 'createTableTime' in the lineage graph are populated with valid value from HMS.""" - query = "create table {0}.lineage_test_tbl_{1} primary key (int_col) stored as {1} " \ - "as select int_col, bigint_col from functional.alltypes".format( - unique_database, table_format) + not_enforced = "" + if table_format == "iceberg": + not_enforced = " NOT ENFORCED" + query = "create table {0}.lineage_test_tbl_{1} primary key (int_col) {2} " \ + "stored as {1} as select int_col, bigint_col from functional.alltypes".format( + unique_database, table_format, not_enforced) result = self.execute_query_expect_success(self.client, query) profile_query_id = re.search("Query \(id=(.*)\):", result.runtime_profile).group(1)