This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit c21e5212b73d4b51cd93e8080de69d3dee7bbfb7
Author: Peter Rozsa <[email protected]>
AuthorDate: Thu Jul 27 08:16:01 2023 +0200

    IMPALA-12243: Add support for DROP PARTITION for Iceberg tables
    
    This change adds support for DROP PARTITION operation for Iceberg
    tables. Users can now execute 'ALTER TABLE table DROP PARTITION
    (partition_filtering_expression)' statements where
    'partition_filtering_expression' consists of at least one binary
    predicate, IN predicate, IS NULL predicate or arbitrary logical
    combination of these, where the predicate's term is a partitioned column
    decorated with a partition transform, for example: 'day(column)' or
    simply column.
    
    To select partitions with mutating transforms (non-identity transforms),
    the user must provide the transform in the selection, for example: if
    column 'a' is partitioned as year(a), the user must filter it with
    explicitly providing the transform, like (year(a) = "2012"). This is a
    requirement for Iceberg's planFiles API. If the column is filtered
    without the transform, the filtering of datafiles with strict projection
    results in ambiguous results.
    
    The binary predicate's operand can be basic comparison operators: =, !=,
    <, >, <=, >=.
    The IN and IS NULL predicates can be inverted with the NOT keyword.
    
    Only constant literals are accepted currently for these predicates; for
    example: (identity_string = "string") is accepted, (identity_string =
    concat(identity_string, "another_string") is not accepted.
    
    Tests:
     - unit tests for IcebergUtil.getDateTransformValue
     - analyzer tests for negative cases, simple cases
     - e2e tests for every transform type, schema evolution and rollback
    
    Change-Id: I2a768ba2966f570454687e02e4e6d67df46741f9
    Reviewed-on: http://gerrit.cloudera.org:8080/20515
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 common/thrift/CatalogObjects.thrift                |  10 +
 common/thrift/JniCatalog.thrift                    |   3 +
 .../analysis/AlterTableDropPartitionStmt.java      | 118 +++++++-
 .../impala/analysis/IcebergPartitionExpr.java      | 186 ++++++++++++
 .../IcebergPartitionExpressionRewriter.java        | 198 +++++++++++++
 .../impala/analysis/IcebergPartitionField.java     |   2 +
 .../org/apache/impala/analysis/PartitionSet.java   |  19 +-
 .../apache/impala/analysis/PartitionSpecBase.java  |  11 +-
 .../org/apache/impala/catalog/FeIcebergTable.java  |  36 ++-
 .../org/apache/impala/catalog/IcebergTable.java    |   3 +-
 .../impala/catalog/iceberg/IcebergCtasTarget.java  |   6 +-
 .../impala/catalog/local/LocalIcebergTable.java    |   3 +-
 .../common/IcebergPartitionPredicateConverter.java |  56 ++++
 .../impala/common/IcebergPredicateConverter.java   | 316 +++++++++++++++++++++
 .../apache/impala/planner/IcebergScanPlanner.java  | 281 ++----------------
 .../apache/impala/service/CatalogOpExecutor.java   |  11 +-
 .../impala/service/DescribeResultFactory.java      |   6 +-
 .../impala/service/IcebergCatalogOpExecutor.java   |  24 +-
 .../java/org/apache/impala/util/IcebergUtil.java   |  73 ++++-
 .../org/apache/impala/analysis/AnalyzeDDLTest.java |  49 ++++
 .../org/apache/impala/util/IcebergUtilTest.java    |  69 ++++-
 .../queries/QueryTest/iceberg-drop-partition.test  | 287 +++++++++++++++++++
 .../queries/QueryTest/iceberg-negative.test        |   7 +-
 tests/query_test/test_iceberg.py                   |  49 ++++
 24 files changed, 1496 insertions(+), 327 deletions(-)

diff --git a/common/thrift/CatalogObjects.thrift 
b/common/thrift/CatalogObjects.thrift
index 891d86f28..116e484cf 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -626,6 +626,16 @@ struct TIcebergContentFileStore {
   5: optional bool has_orc
   6: optional bool has_parquet
 }
+// Represents a drop partition request for Iceberg tables
+struct TIcebergDropPartitionRequest {
+  // List of affected file paths (could be empty if the drop partition
+  // request can be exchanged with a truncate command)
+  1: required list<string> paths
+  // Indicates whether the request could be exchanged with a truncate command
+  2: required bool is_truncate
+  // Number of affected partitions that will be dropped
+  3: required i64 num_partitions
+}
 
 struct TIcebergTable {
   // Iceberg file system table location
diff --git a/common/thrift/JniCatalog.thrift b/common/thrift/JniCatalog.thrift
index d4b235770..5297cc4dd 100755
--- a/common/thrift/JniCatalog.thrift
+++ b/common/thrift/JniCatalog.thrift
@@ -292,6 +292,9 @@ struct TAlterTableDropPartitionParams {
 
   // If true, underlying data is purged using -skipTrash
   3: required bool purge
+
+  // Summary of partitions to delete for Iceberg tables
+  4: optional CatalogObjects.TIcebergDropPartitionRequest 
iceberg_drop_partition_request
 }
 
 // Parameters for ALTER TABLE ALTER/CHANGE COLUMN commands
diff --git 
a/fe/src/main/java/org/apache/impala/analysis/AlterTableDropPartitionStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/AlterTableDropPartitionStmt.java
index cd52a29bf..8e75f077f 100644
--- 
a/fe/src/main/java/org/apache/impala/analysis/AlterTableDropPartitionStmt.java
+++ 
b/fe/src/main/java/org/apache/impala/analysis/AlterTableDropPartitionStmt.java
@@ -17,15 +17,34 @@
 
 package org.apache.impala.analysis;
 
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
 import org.apache.impala.authorization.Privilege;
 import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
+import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.common.AnalysisException;
+import org.apache.impala.common.IcebergPartitionPredicateConverter;
+import org.apache.impala.common.IcebergPredicateConverter;
+import org.apache.impala.common.ImpalaException;
+import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.thrift.TAlterTableDropPartitionParams;
 import org.apache.impala.thrift.TAlterTableParams;
 import org.apache.impala.thrift.TAlterTableType;
-import com.google.common.base.Preconditions;
+import org.apache.impala.thrift.TIcebergDropPartitionRequest;
+import org.apache.impala.util.IcebergUtil;
 
 /**
  * Represents an ALTER TABLE DROP PARTITION statement.
@@ -38,6 +57,16 @@ public class AlterTableDropPartitionStmt extends 
AlterTableStmt {
   // deleted. For example, for HDFS tables it skips the trash mechanism
   private final boolean purgePartition_;
 
+  // File paths selected by Iceberg's partition filtering
+  private List<String> icebergFilePaths_;
+
+  // Statistics for selected Iceberg partitions
+  private long numberOfIcebergPartitions_;
+
+  // If every partition is selected by Iceberg's partition filtering, this 
flag signals
+  // that a truncate should be executed instead of deleting every file from 
the metadata.
+  private boolean isIcebergTruncate_ = false;
+
   public AlterTableDropPartitionStmt(TableName tableName,
       PartitionSet partitionSet, boolean ifExists, boolean purgePartition) {
     super(tableName);
@@ -64,11 +93,24 @@ public class AlterTableDropPartitionStmt extends 
AlterTableStmt {
   public TAlterTableParams toThrift() {
     TAlterTableParams params = super.toThrift();
     params.setAlter_type(TAlterTableType.DROP_PARTITION);
-    TAlterTableDropPartitionParams addPartParams = new 
TAlterTableDropPartitionParams();
-    addPartParams.setPartition_set(partitionSet_.toThrift());
-    addPartParams.setIf_exists(!partitionSet_.getPartitionShouldExist());
-    addPartParams.setPurge(purgePartition_);
-    params.setDrop_partition_params(addPartParams);
+    TAlterTableDropPartitionParams dropPartParams = new 
TAlterTableDropPartitionParams();
+    dropPartParams.setIf_exists(!partitionSet_.getPartitionShouldExist());
+    dropPartParams.setPurge(purgePartition_);
+    params.setDrop_partition_params(dropPartParams);
+    if (table_ instanceof FeIcebergTable) {
+      TIcebergDropPartitionRequest request = new 
TIcebergDropPartitionRequest();
+      request.setIs_truncate(isIcebergTruncate_);
+      if (isIcebergTruncate_) {
+        request.setPaths(Collections.emptyList());
+      } else {
+        request.setPaths(icebergFilePaths_);
+      }
+      request.num_partitions = numberOfIcebergPartitions_;
+      dropPartParams.setIceberg_drop_partition_request(request);
+      dropPartParams.setPartition_set(Collections.emptyList());
+    } else {
+      dropPartParams.setPartition_set(partitionSet_.toThrift());
+    }
     return params;
   }
 
@@ -79,12 +121,70 @@ public class AlterTableDropPartitionStmt extends 
AlterTableStmt {
     if (table instanceof FeKuduTable) {
       throw new AnalysisException("ALTER TABLE DROP PARTITION is not supported 
for " +
           "Kudu tables: " + partitionSet_.toSql());
-    } else if (table instanceof FeIcebergTable) {
-      throw new AnalysisException("ALTER TABLE DROP PARTITION is not supported 
for " +
-          "Iceberg tables: " + table.getFullName());
     }
     if (!ifExists_) partitionSet_.setPartitionShouldExist();
     partitionSet_.setPrivilegeRequirement(Privilege.ALTER);
     partitionSet_.analyze(analyzer);
+
+    if (table instanceof FeIcebergTable) { analyzeIceberg(analyzer); }
+  }
+
+  public void analyzeIceberg(Analyzer analyzer) throws AnalysisException {
+    if (purgePartition_) {
+      throw new AnalysisException(
+          "Partition purge is not supported for Iceberg tables");
+    }
+
+    FeIcebergTable table = (FeIcebergTable) table_;
+    // To rewrite transforms and column references
+    IcebergPartitionExpressionRewriter rewriter =
+        new IcebergPartitionExpressionRewriter(analyzer,
+            table.getIcebergApiTable().spec());
+    // For Impala expression to Iceberg expression conversion
+    IcebergPredicateConverter converter =
+        new IcebergPartitionPredicateConverter(table.getIcebergSchema(), 
analyzer);
+
+    List<Expression> icebergPartitionExprs = new ArrayList<>();
+    for (Expr expr : partitionSet_.getPartitionExprs()) {
+      expr = rewriter.rewrite(expr);
+      expr.analyze(analyzer);
+      analyzer.getConstantFolder().rewrite(expr, analyzer);
+      try {
+        icebergPartitionExprs.add(converter.convert(expr));
+      } catch (ImpalaException e) {
+        throw new AnalysisException(
+            "Invalid partition filtering expression: " + expr.toSql());
+      }
+    }
+
+    try (CloseableIterable<FileScanTask> fileScanTasks = 
IcebergUtil.planFiles(table,
+        icebergPartitionExprs, null)) {
+      icebergFilePaths_ = new ArrayList<>();
+      Set<String> icebergPartitionSummary = new HashSet<>();
+      for (FileScanTask fileScanTask : fileScanTasks) {
+        if (fileScanTask.residual().isEquivalentTo(Expressions.alwaysTrue())) {
+          
icebergPartitionSummary.add(fileScanTask.file().partition().toString());
+          List<DeleteFile> deleteFiles = fileScanTask.deletes();
+          if (!deleteFiles.isEmpty()) {
+            icebergFilePaths_.addAll(deleteFiles.stream()
+                .map(deleteFile -> deleteFile.path().toString()).collect(
+                    Collectors.toSet()));
+          }
+          icebergFilePaths_.add(fileScanTask.file().path().toString());
+        }
+      }
+      numberOfIcebergPartitions_ = icebergPartitionSummary.size();
+      if (icebergFilePaths_.size() == 
FeIcebergTable.Utils.getTotalNumberOfFiles(table,
+          null)) {
+        isIcebergTruncate_ = true;
+        icebergFilePaths_ = Collections.emptyList();
+      }
+    } catch (IOException | TableLoadingException | ImpalaRuntimeException e) {
+      throw new AnalysisException("Error loading metadata for Iceberg table", 
e);
+    }
+    if (numberOfIcebergPartitions_ == 0 && !ifExists_) {
+      throw new AnalysisException(
+          "No matching partition(s) found");
+    }
   }
 }
diff --git 
a/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionExpr.java 
b/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionExpr.java
new file mode 100644
index 000000000..20859b25a
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionExpr.java
@@ -0,0 +1,186 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import java.util.List;
+import java.util.Optional;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.impala.catalog.Column;
+import org.apache.impala.catalog.IcebergColumn;
+import org.apache.impala.catalog.Type;
+import org.apache.impala.common.AnalysisException;
+import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.thrift.TExprNode;
+import org.apache.impala.thrift.TIcebergPartitionTransformType;
+import org.apache.impala.util.IcebergUtil;
+
+/**
+ * IcebergPartitionExpr represents a partitioned Iceberg table column with its 
transform
+ * and its value. This class is used as a temporary state for Iceberg table's 
partition
+ * scanning. Later on it gets transformed to an Iceberg Expression.
+ */
+public class IcebergPartitionExpr extends Expr {
+  private final IcebergPartitionTransform transform_;
+  private final SlotRef slotRef_;
+  private final PartitionSpec partitionSpec_;
+
+  public IcebergPartitionExpr(SlotRef slotRef, PartitionSpec partitionSpec) {
+    this(new 
IcebergPartitionTransform(TIcebergPartitionTransformType.IDENTITY), slotRef,
+        partitionSpec);
+  }
+
+  protected IcebergPartitionExpr(IcebergPartitionTransform transform, SlotRef 
slotRef,
+      PartitionSpec partitionSpec) {
+    transform_ = transform;
+    slotRef_ = slotRef;
+    partitionSpec_ = partitionSpec;
+  }
+
+  public IcebergPartitionExpr(FunctionCallExpr callExpr, PartitionSpec 
partitionSpec)
+      throws AnalysisException {
+    partitionSpec_ = partitionSpec;
+    FunctionParams params = callExpr.getParams();
+    if (params.size() > 2 && params.size() < 1) {
+      throw new AnalysisException("Invalid partition predicate: " + 
callExpr.toSql());
+    }
+    Expr slotRefExpr;
+    Expr transformParamExpr;
+    Optional<Integer> transformParam = Optional.empty();
+
+    if (params.size() == 1) { // transform(column)
+      slotRefExpr = params.exprs().get(0);
+    } else { // transform(transform_param, column)
+      transformParamExpr = params.exprs().get(0);
+      slotRefExpr = params.exprs().get(1);
+
+      if (!(transformParamExpr instanceof LiteralExpr)) {
+        throw new AnalysisException("Invalid transform parameter: " + 
transformParamExpr);
+      }
+      LiteralExpr literal = (LiteralExpr) transformParamExpr;
+      String stringValue = literal.getStringValue();
+      try {
+        int parsedInt = Integer.parseInt(stringValue);
+        transformParam = Optional.of(parsedInt);
+      } catch (NumberFormatException e) {
+        throw new AnalysisException("Invalid transform parameter value: " + 
stringValue);
+      }
+    }
+
+    if (!(slotRefExpr instanceof SlotRef)) {
+      throw new AnalysisException("Invalid partition predicate: " + 
callExpr.toSql());
+    }
+
+    String functionName = callExpr.getFnName().getFunction();
+    if (functionName == null) {
+      List<String> fnNamePath = callExpr.getFnName().getFnNamePath();
+      if (fnNamePath.size() > 1) {
+        throw new AnalysisException("Invalid partition transform: " + 
fnNamePath);
+      }
+      functionName = fnNamePath.get(0);
+    }
+    String transformString = functionName.toUpperCase();
+
+    slotRef_ = (SlotRef) slotRefExpr;
+
+    try{
+      if (transformParam.isPresent()) {
+        transform_ =
+            IcebergUtil.getPartitionTransform(transformString, 
transformParam.get());
+      } else {
+        transform_ = IcebergUtil.getPartitionTransform(transformString);
+      }
+    }
+    catch (ImpalaRuntimeException e) {
+      throw new AnalysisException(e.getMessage());
+    }
+
+    if 
(transform_.getTransformType().equals(TIcebergPartitionTransformType.VOID)) {
+      throw new AnalysisException(
+          "VOID transform is not supported for partition selection");
+    }
+  }
+
+  public SlotRef getSlotRef() { return slotRef_; }
+
+  public IcebergPartitionTransform getTransform() { return transform_; }
+
+  @Override
+  protected void analyzeImpl(Analyzer analyzer) throws AnalysisException {
+    slotRef_.analyze(analyzer);
+    transform_.analyze(analyzer);
+    SlotDescriptor desc = slotRef_.getDesc();
+    IcebergColumn icebergColumn = (IcebergColumn)desc.getColumn();
+
+    List<PartitionField> partitionFields =
+        partitionSpec_.getFieldsBySourceId(icebergColumn.getFieldId());
+    if (partitionFields.isEmpty()) {
+      throw new AnalysisException(
+          "Partition exprs cannot contain non-partition column(s): " + 
toSql());
+    }
+    TIcebergPartitionTransformType transformType = 
transform_.getTransformType();
+    if (!transformType.equals(TIcebergPartitionTransformType.IDENTITY)) {
+      for (PartitionField field : partitionFields) {
+        TIcebergPartitionTransformType transformTypeFromSpec;
+        try {
+          transformTypeFromSpec =
+              
IcebergUtil.getPartitionTransformType(field.transform().toString());
+        }
+        catch (ImpalaRuntimeException e) {
+          throw new AnalysisException(e.getCause());
+        }
+        if (!transformTypeFromSpec.equals(transformType)) {
+          throw new AnalysisException(
+              String.format("Can't filter column '%s' with transform type: 
'%s'",
+                  slotRef_.toSql(), transformType));
+        }
+      }
+    }
+    if (transformType.equals(TIcebergPartitionTransformType.TRUNCATE)
+        || transformType.equals(TIcebergPartitionTransformType.IDENTITY)) {
+      type_ = slotRef_.type_;
+    }
+    if (transformType.equals(TIcebergPartitionTransformType.YEAR)
+        || transformType.equals(TIcebergPartitionTransformType.BUCKET)
+        || transformType.equals(TIcebergPartitionTransformType.MONTH)
+        || transformType.equals(TIcebergPartitionTransformType.DAY)
+        || transformType.equals(TIcebergPartitionTransformType.HOUR)) {
+      type_ = Type.INT;
+    }
+  }
+
+  @Override
+  protected float computeEvalCost() {
+    return UNKNOWN_COST;
+  }
+
+  @Override
+  protected String toSqlImpl(ToSqlOptions options) {
+    return transform_.toSql(slotRef_.toSql());
+  }
+
+  @Override
+  protected void toThrift(TExprNode msg) {
+    /* Keeping it empty to avoid constant folding */
+  }
+
+  @Override
+  public Expr clone() {
+    return new IcebergPartitionExpr(transform_, slotRef_, partitionSpec_);
+  }
+}
diff --git 
a/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionExpressionRewriter.java
 
b/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionExpressionRewriter.java
new file mode 100644
index 000000000..c47da3ea1
--- /dev/null
+++ 
b/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionExpressionRewriter.java
@@ -0,0 +1,198 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.catalog.IcebergTable;
+import org.apache.impala.common.AnalysisException;
+import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.thrift.TIcebergPartitionTransformType;
+import org.apache.impala.util.IcebergUtil;
+
+class IcebergPartitionExpressionRewriter {
+  private final Analyzer analyzer_;
+  private final org.apache.iceberg.PartitionSpec partitionSpec_;
+
+  public IcebergPartitionExpressionRewriter(Analyzer analyzer,
+      org.apache.iceberg.PartitionSpec partitionSpec) {
+    this.analyzer_ = analyzer;
+    this.partitionSpec_ = partitionSpec;
+  }
+
+  /**
+   * Rewrites SlotRefs and FunctionCallExprs as IcebergPartitionExpr. SlotRefs 
targeting a
+   * columns are rewritten to an IcebergPartitionExpr where the transform type 
is
+   * IDENTITY. FunctionExprs are checked whether the function name matches any 
Iceberg
+   * transform name, if it matches, then it gets rewritten to an 
IcebergPartitionExpr
+   * where the transform is located from the function name, and the parameter 
(if there's
+   * any) for the transform is saved as well, and the targeted column's 
SlotRef is also
+   * saved in the IcebergPartitionExpr. The resulting IcebergPartitionExpr 
then replaces
+   * the original SlotRef/FunctionCallExpr. For Date transforms (year, month, 
day, hour),
+   * an implicit conversion happens during the rewriting: string literals 
formed as
+   * Iceberg partition values like "2023", "2023-12", ... are parsed and 
transformed
+   * automatically to their numeric counterparts.
+   *
+   * @param expr incoming expression tree
+   * @return Expr where SlotRefs and FunctionCallExprs are replaced with
+   * IcebergPartitionExpr
+   * @throws AnalysisException when expression rewrite fails
+   */
+  public Expr rewrite(Expr expr) throws AnalysisException {
+    if (expr instanceof BinaryPredicate) {
+      BinaryPredicate binaryPredicate = (BinaryPredicate) expr;
+      return rewrite(binaryPredicate);
+    }
+    if (expr instanceof CompoundPredicate) {
+      CompoundPredicate compoundPredicate = (CompoundPredicate) expr;
+      return rewrite(compoundPredicate);
+    }
+    if (expr instanceof SlotRef) {
+      SlotRef slotRef = (SlotRef) expr;
+      return rewrite(slotRef);
+    }
+    if (expr instanceof FunctionCallExpr) {
+      FunctionCallExpr functionCallExpr = (FunctionCallExpr) expr;
+      return rewrite(functionCallExpr);
+    }
+    if (expr instanceof IsNullPredicate) {
+      IsNullPredicate isNullPredicate = (IsNullPredicate) expr;
+      return rewrite(isNullPredicate);
+    }
+    if (expr instanceof InPredicate) {
+      InPredicate isNullPredicate = (InPredicate) expr;
+      return rewrite(isNullPredicate);
+    }
+    throw new AnalysisException(
+        "Invalid partition filtering expression: " + expr.toSql());
+  }
+
+  private BinaryPredicate rewrite(BinaryPredicate binaryPredicate)
+      throws AnalysisException {
+    Expr term = binaryPredicate.getChild(0);
+    Expr literal = binaryPredicate.getChild(1);
+    IcebergPartitionExpr partitionExpr;
+    if (term instanceof SlotRef) {
+      partitionExpr = rewrite((SlotRef) term);
+      binaryPredicate.getChildren().set(0, partitionExpr);
+    } else if (term instanceof FunctionCallExpr) {
+      partitionExpr = rewrite((FunctionCallExpr) term);
+      binaryPredicate.getChildren().set(0, partitionExpr);
+    } else {
+      return binaryPredicate;
+    }
+    if (!(literal instanceof LiteralExpr)) {
+      return binaryPredicate;
+    }
+    TIcebergPartitionTransformType transformType = partitionExpr.getTransform()
+        .getTransformType();
+    if (!IcebergUtil.isDateTimeTransformType(transformType)) { return 
binaryPredicate; }
+    rewriteDateTransformConstants((LiteralExpr) literal, transformType,
+        numericLiteral -> binaryPredicate.getChildren().set(1, 
numericLiteral));
+    return binaryPredicate;
+  }
+
+  private InPredicate rewrite(InPredicate inPredicate)
+      throws AnalysisException {
+    Expr term = inPredicate.getChild(0);
+    List<Expr> literals = inPredicate.getChildren()
+        .subList(1, inPredicate.getChildCount());
+    IcebergPartitionExpr partitionExpr;
+    if (term instanceof SlotRef) {
+      partitionExpr = rewrite((SlotRef) term);
+      inPredicate.getChildren().set(0, partitionExpr);
+    } else if (term instanceof FunctionCallExpr) {
+      partitionExpr = rewrite((FunctionCallExpr) term);
+      inPredicate.getChildren().set(0, partitionExpr);
+    } else {
+      return inPredicate;
+    }
+    TIcebergPartitionTransformType transformType = partitionExpr.getTransform()
+        .getTransformType();
+    for (int i = 0; i < literals.size(); ++i) {
+      if (!(literals.get(i) instanceof LiteralExpr)) {
+        return inPredicate;
+      }
+      LiteralExpr literal = (LiteralExpr) literals.get(i);
+      int affectedChildId = i + 1;
+      if (!IcebergUtil.isDateTimeTransformType(transformType)) { continue; }
+      rewriteDateTransformConstants(literal, transformType,
+          numericLiteral -> inPredicate.getChildren()
+              .set(affectedChildId, numericLiteral));
+    }
+    return inPredicate;
+  }
+
+  private void rewriteDateTransformConstants(LiteralExpr literal,
+      TIcebergPartitionTransformType transformType,
+      Function<NumericLiteral, ?> rewrite) {
+    
Preconditions.checkState(IcebergUtil.isDateTimeTransformType(transformType));
+    if (transformType.equals(TIcebergPartitionTransformType.YEAR)
+        && literal instanceof NumericLiteral) {
+      NumericLiteral numericLiteral = (NumericLiteral) literal;
+      long longValue = numericLiteral.getLongValue();
+      long target = longValue + IcebergUtil.ICEBERG_EPOCH_YEAR;
+      analyzer_.addWarning(String.format(
+          "The YEAR transform expects years normalized to %d: %d is targeting 
year %d",
+          IcebergUtil.ICEBERG_EPOCH_YEAR, longValue, target));
+      return;
+    }
+    if (!(literal instanceof StringLiteral)) { return; }
+    try {
+      Integer dateTimeTransformValue = IcebergUtil.getDateTimeTransformValue(
+          transformType,
+          literal.getStringValue());
+      rewrite.apply(NumericLiteral.create(dateTimeTransformValue));
+    } catch (ImpalaRuntimeException ignore) {}
+  }
+
+  private CompoundPredicate rewrite(CompoundPredicate compoundPredicate)
+      throws AnalysisException {
+    Expr left = compoundPredicate.getChild(0);
+    Expr right = compoundPredicate.getChild(1);
+    compoundPredicate.setChild(0, rewrite(left));
+    compoundPredicate.setChild(1, rewrite(right));
+    return compoundPredicate;
+  }
+
+  private IcebergPartitionExpr rewrite(SlotRef slotRef) {
+    return new IcebergPartitionExpr(slotRef, partitionSpec_);
+  }
+
+  private IcebergPartitionExpr rewrite(FunctionCallExpr functionCallExpr)
+      throws AnalysisException {
+    return new IcebergPartitionExpr(functionCallExpr, partitionSpec_);
+  }
+
+  private IsNullPredicate rewrite(IsNullPredicate isNullPredicate)
+      throws AnalysisException {
+    Expr child = isNullPredicate.getChild(0);
+
+    if (child instanceof SlotRef) {
+      isNullPredicate.getChildren().set(0, rewrite(child));
+    }
+    if (child instanceof FunctionCallExpr) {
+      isNullPredicate.getChildren()
+          .set(0, new IcebergPartitionExpr((FunctionCallExpr) child, 
partitionSpec_));
+    }
+    return isNullPredicate;
+  }
+}
diff --git 
a/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionField.java 
b/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionField.java
index bb77c3699..2e9915a9e 100644
--- a/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionField.java
+++ b/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionField.java
@@ -65,6 +65,8 @@ public class IcebergPartitionField extends StmtNode {
     return sourceId_;
   }
 
+  public int getFieldId_() { return fieldId_; }
+
   public TIcebergPartitionTransformType getTransformType() {
     return transform_.getTransformType();
   }
diff --git a/fe/src/main/java/org/apache/impala/analysis/PartitionSet.java 
b/fe/src/main/java/org/apache/impala/analysis/PartitionSet.java
index 375b8997b..d28af7341 100644
--- a/fe/src/main/java/org/apache/impala/analysis/PartitionSet.java
+++ b/fe/src/main/java/org/apache/impala/analysis/PartitionSet.java
@@ -17,14 +17,17 @@
 
 package org.apache.impala.analysis;
 
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-
 import org.apache.impala.analysis.BinaryPredicate.Operator;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.FeFsPartition;
+import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.ImpalaException;
@@ -32,10 +35,6 @@ import org.apache.impala.common.Reference;
 import org.apache.impala.planner.HdfsPartitionPruner;
 import org.apache.impala.thrift.TPartitionKeyValue;
 
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-
 /**
  * Represents a set of partitions resulting from evaluating a list of 
partition conjuncts
  * against a table's partition list.
@@ -45,20 +44,20 @@ public class PartitionSet extends PartitionSpecBase {
 
   // Result of analysis, null until analysis is complete.
   private List<? extends FeFsPartition> partitions_;
-
   public PartitionSet(List<Expr> partitionExprs) {
     this.partitionExprs_ = ImmutableList.copyOf(partitionExprs);
   }
-
   public List<? extends FeFsPartition> getPartitions() { return partitions_; }
+  public List<Expr> getPartitionExprs() { return partitionExprs_; }
 
   @Override
   public void analyze(Analyzer analyzer) throws AnalysisException {
     super.analyze(analyzer);
+    if(table_ instanceof FeIcebergTable) return;
     List<Expr> conjuncts = new ArrayList<>();
     // Do not register column-authorization requests.
     analyzer.setEnablePrivChecks(false);
-    for (Expr e: partitionExprs_) {
+    for (Expr e : partitionExprs_) {
       e.analyze(analyzer);
       e.checkReturnsBool("Partition expr", false);
       conjuncts.addAll(e.getConjuncts());
@@ -66,7 +65,7 @@ public class PartitionSet extends PartitionSpecBase {
 
     TupleDescriptor desc = analyzer.getDescriptor(tableName_.toString());
     List<SlotId> partitionSlots = desc.getPartitionSlots();
-    for (Expr e: conjuncts) {
+    for (Expr e : conjuncts) {
       analyzer.getConstantFolder().rewrite(e, analyzer);
       // Make sure there are no constant predicates in the partition exprs.
       if (e.isConstant()) {
@@ -89,7 +88,7 @@ public class PartitionSet extends PartitionSpecBase {
       partitions_ = pruner.prunePartitions(analyzer, transformedConjuncts, 
true,
           null).first;
     } catch (ImpalaException e) {
-      if (e instanceof AnalysisException) throw (AnalysisException) e;
+      if (e instanceof AnalysisException) throw (AnalysisException)e;
       throw new AnalysisException("Partition expr evaluation failed in the 
backend.", e);
     }
 
diff --git a/fe/src/main/java/org/apache/impala/analysis/PartitionSpecBase.java 
b/fe/src/main/java/org/apache/impala/analysis/PartitionSpecBase.java
index 8f6cd3b1d..723b90db8 100644
--- a/fe/src/main/java/org/apache/impala/analysis/PartitionSpecBase.java
+++ b/fe/src/main/java/org/apache/impala/analysis/PartitionSpecBase.java
@@ -18,6 +18,7 @@
 package org.apache.impala.analysis;
 
 import org.apache.impala.authorization.Privilege;
+import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeTable;
@@ -83,7 +84,7 @@ public abstract class PartitionSpecBase extends StmtNode {
     }
 
     // Make sure the target table is partitioned.
-    if (table.getMetaStoreTable().getPartitionKeysSize() == 0) {
+    if (!isPartitioned(table)) {
       throw new AnalysisException("Table is not partitioned: " + tableName_);
     }
 
@@ -93,6 +94,14 @@ public abstract class PartitionSpecBase extends StmtNode {
     nullPartitionKeyValue_ = table_.getNullPartitionKeyValue();
   }
 
+  private boolean isPartitioned(FeTable table) {
+    if (table instanceof FeIcebergTable) {
+      return ((FeIcebergTable) table).isPartitioned();
+    } else {
+      return table.getMetaStoreTable().getPartitionKeysSize() != 0;
+    }
+  }
+
   @Override
   public final String toSql() {
     return toSql(DEFAULT);
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java 
b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
index 93449fc90..cd2fd0af5 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
@@ -23,7 +23,6 @@ import com.google.common.collect.Lists;
 import com.google.common.primitives.Ints;
 
 import java.io.IOException;
-import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -67,6 +66,7 @@ import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.iceberg.GroupedContentFiles;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.FileSystemUtil;
+import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.common.PrintUtils;
 import org.apache.impala.common.Reference;
@@ -545,7 +545,7 @@ public interface FeIcebergTable extends FeFsTable {
      * @return a list of {@link 
org.apache.hadoop.hive.metastore.api.FieldSchema}
      */
     public static List<FieldSchema> getPartitionTransformKeys(FeIcebergTable 
table)
-        throws TableLoadingException {
+        throws ImpalaRuntimeException {
       Table icebergTable = table.getIcebergApiTable();
 
       if (icebergTable.specs().isEmpty()) {
@@ -553,7 +553,7 @@ public interface FeIcebergTable extends FeFsTable {
       }
 
       PartitionSpec latestSpec = icebergTable.spec();
-      HashMap<String, Integer> transformParams = 
IcebergUtil.getPartitionTransformParams(
+      Map<String, Integer> transformParams = 
IcebergUtil.getPartitionTransformParams(
           latestSpec);
       List<FieldSchema> fieldSchemaList = Lists.newArrayList();
       for (PartitionField field : latestSpec.fields()) {
@@ -888,7 +888,7 @@ public interface FeIcebergTable extends FeFsTable {
      * Get iceberg partition spec by iceberg table metadata
      */
     public static List<IcebergPartitionSpec> loadPartitionSpecByIceberg(
-        FeIcebergTable table) throws TableLoadingException {
+        FeIcebergTable table) throws ImpalaRuntimeException {
       List<IcebergPartitionSpec> ret = new ArrayList<>();
       for (PartitionSpec spec : table.getIcebergApiTable().specs().values()) {
         ret.add(convertPartitionSpec(spec));
@@ -897,9 +897,9 @@ public interface FeIcebergTable extends FeFsTable {
     }
 
     public static IcebergPartitionSpec convertPartitionSpec(PartitionSpec spec)
-        throws TableLoadingException {
-      List<IcebergPartitionField> fields = new ArrayList<>();;
-      HashMap<String, Integer> transformParams =
+        throws ImpalaRuntimeException {
+      List<IcebergPartitionField> fields = new ArrayList<>();
+      Map<String, Integer> transformParams =
           IcebergUtil.getPartitionTransformParams(spec);
       for (PartitionField field : spec.fields()) {
         fields.add(new IcebergPartitionField(field.sourceId(), field.fieldId(),
@@ -1052,5 +1052,27 @@ public interface FeIcebergTable extends FeFsTable {
       return new FileStatus(contentFile.fileSizeInBytes(), false, 0, 0,
           DEFAULT_MODIFICATION_TIME, path);
     }
+
+    public static long getTotalNumberOfFiles(FeIcebergTable icebergTable,
+        TimeTravelSpec travelSpec)
+        throws ImpalaRuntimeException {
+      Map<String, String> snapshotSummary = getSnapshotSummary(
+          icebergTable.getIcebergApiTable(),
+          travelSpec);
+      if (snapshotSummary == null) {
+        throw new ImpalaRuntimeException("Invalid Iceberg snapshot summary");
+      }
+      try {
+        String totalDataFilesProp = snapshotSummary.get(
+            SnapshotSummary.TOTAL_DATA_FILES_PROP);
+        String totalDeleteFilesProp = snapshotSummary.getOrDefault(
+            SnapshotSummary.TOTAL_DELETE_FILES_PROP, "0");
+        long totalDataFiles = Long.parseLong(totalDataFilesProp);
+        long totalDeleteFiles = Long.parseLong(totalDeleteFilesProp);
+        return totalDataFiles + totalDeleteFiles;
+      } catch (NumberFormatException e) {
+        throw new ImpalaRuntimeException("Invalid Iceberg snapshot summary 
value");
+      }
+    }
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java 
b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
index 8ef7793a5..d789e12c5 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
@@ -465,7 +465,8 @@ public class IcebergTable extends Table implements 
FeIcebergTable {
   /**
    * Load schema and partitioning schemes directly from Iceberg.
    */
-  public void loadSchemaFromIceberg() throws TableLoadingException {
+  public void loadSchemaFromIceberg()
+      throws TableLoadingException, ImpalaRuntimeException {
     loadSchema();
     addVirtualColumns();
     partitionSpecs_ = Utils.loadPartitionSpecByIceberg(this);
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java 
b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java
index e9806ac73..4edd9271a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java
@@ -90,7 +90,7 @@ public class IcebergCtasTarget extends CtasTargetTable 
implements FeIcebergTable
 
   public IcebergCtasTarget(FeDb db, org.apache.hadoop.hive.metastore.api.Table 
msTbl,
       List<ColumnDef> columnDefs, IcebergPartitionSpec partSpec)
-      throws CatalogException {
+      throws CatalogException, ImpalaRuntimeException {
     super(msTbl, db, msTbl.getTableName(), msTbl.getOwner());
     createFsTable(db, msTbl);
     createIcebergSchema(columnDefs);
@@ -125,11 +125,11 @@ public class IcebergCtasTarget extends CtasTargetTable 
implements FeIcebergTable
   }
 
   private void createPartitionSpec(IcebergPartitionSpec partSpec)
-      throws CatalogException {
+      throws CatalogException, ImpalaRuntimeException {
     Preconditions.checkState(iceSchema_ != null);
     PartitionSpec iceSpec = null;
     try {
-      // Let's create an Iceberg PartitionSpec with the help of Icebeg from 
'partSpec',
+      // Let's create an Iceberg PartitionSpec with the help of Iceberg from 
'partSpec',
       // then convert it back to an IcebergPartitionSpec.
       if (partSpec == null) {
         iceSpec = PartitionSpec.unpartitioned();
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java 
b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
index 755055a7e..5fbaa7ca4 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
@@ -36,6 +36,7 @@ import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.IcebergContentFileStore;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.catalog.local.MetaProvider.TableMetaRef;
+import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.thrift.TCompressionCodec;
 import org.apache.impala.thrift.THdfsPartition;
 import org.apache.impala.thrift.THdfsTable;
@@ -138,7 +139,7 @@ public class LocalIcebergTable extends LocalTable 
implements FeIcebergTable {
   private LocalIcebergTable(LocalDb db, Table msTable, 
MetaProvider.TableMetaRef ref,
       LocalFsTable fsTable, ColumnMap cmap, TPartialTableInfo tableInfo,
       TableParams tableParams, org.apache.iceberg.Table icebergApiTable)
-      throws TableLoadingException {
+      throws ImpalaRuntimeException {
     super(db, msTable, ref, cmap);
 
     Preconditions.checkNotNull(tableInfo);
diff --git 
a/fe/src/main/java/org/apache/impala/common/IcebergPartitionPredicateConverter.java
 
b/fe/src/main/java/org/apache/impala/common/IcebergPartitionPredicateConverter.java
new file mode 100644
index 000000000..1a7a3cf93
--- /dev/null
+++ 
b/fe/src/main/java/org/apache/impala/common/IcebergPartitionPredicateConverter.java
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.common;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.UnboundTerm;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.impala.analysis.Analyzer;
+import org.apache.impala.analysis.Expr;
+import org.apache.impala.analysis.IcebergPartitionExpr;
+import org.apache.impala.catalog.Column;
+import org.apache.impala.catalog.IcebergColumn;
+import org.apache.impala.thrift.TIcebergPartitionTransformType;
+
+public class IcebergPartitionPredicateConverter extends 
IcebergPredicateConverter {
+  public IcebergPartitionPredicateConverter(Schema schema, Analyzer analyzer) {
+    super(schema, analyzer);
+  }
+
+  @Override
+  protected Term getTerm(Expr expr) throws ImpalaRuntimeException {
+    if(!(expr instanceof IcebergPartitionExpr)) {
+      throw new ImpalaRuntimeException("Unsupported expression type: " + expr);
+    }
+    IcebergPartitionExpr partitionExpr = (IcebergPartitionExpr) expr;
+    Column column = getColumnFromSlotRef(partitionExpr.getSlotRef());
+    if(!(column instanceof IcebergColumn)){
+      throw new ImpalaRuntimeException(
+          String.format("Invalid column type %s for column: %s",
+              column.getType(), column));
+    }
+    IcebergColumn icebergColumn = (IcebergColumn) column;
+    if (partitionExpr.getTransform().getTransformType().equals(
+        TIcebergPartitionTransformType.IDENTITY)) {
+      return new Term(Expressions.ref(column.getName()),icebergColumn);
+    }
+    return new Term((UnboundTerm<Object>) 
Expressions.transform(column.getName(),
+        Transforms.fromString(partitionExpr.getTransform().toSql())), 
icebergColumn);
+  }
+}
diff --git 
a/fe/src/main/java/org/apache/impala/common/IcebergPredicateConverter.java 
b/fe/src/main/java/org/apache/impala/common/IcebergPredicateConverter.java
new file mode 100644
index 000000000..124d76a31
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/common/IcebergPredicateConverter.java
@@ -0,0 +1,316 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.common;
+
+import com.google.common.base.Preconditions;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expression.Operation;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.UnboundPredicate;
+import org.apache.iceberg.expressions.UnboundTerm;
+import org.apache.iceberg.types.Types;
+import org.apache.impala.analysis.Analyzer;
+import org.apache.impala.analysis.BinaryPredicate;
+import org.apache.impala.analysis.BoolLiteral;
+import org.apache.impala.analysis.CompoundPredicate;
+import org.apache.impala.analysis.DateLiteral;
+import org.apache.impala.analysis.Expr;
+import org.apache.impala.analysis.InPredicate;
+import org.apache.impala.analysis.IsNullPredicate;
+import org.apache.impala.analysis.LiteralExpr;
+import org.apache.impala.analysis.NumericLiteral;
+import org.apache.impala.analysis.SlotDescriptor;
+import org.apache.impala.analysis.SlotRef;
+import org.apache.impala.analysis.StringLiteral;
+import org.apache.impala.catalog.Column;
+import org.apache.impala.catalog.IcebergColumn;
+import org.apache.impala.catalog.PrimitiveType;
+import org.apache.impala.catalog.Type;
+import org.apache.impala.util.ExprUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IcebergPredicateConverter {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(IcebergPredicateConverter.class);
+  private final Schema schema_;
+  private final Analyzer analyzer_;
+
+  public IcebergPredicateConverter(Schema schema, Analyzer analyzer) {
+    this.schema_ = schema;
+    this.analyzer_ = analyzer;
+  }
+
+  public Expression convert(Expr expr) throws ImpalaRuntimeException {
+    if (expr instanceof BinaryPredicate) {
+      return convert((BinaryPredicate) expr);
+    } else if (expr instanceof InPredicate) {
+      return convert((InPredicate) expr);
+    } else if (expr instanceof IsNullPredicate) {
+      return convert((IsNullPredicate) expr);
+    } else if (expr instanceof CompoundPredicate) {
+      return convert((CompoundPredicate) expr);
+    } else {
+      throw new ImpalaRuntimeException(String.format(
+          "Unsupported expression: %s", expr.toSql()));
+    }
+  }
+
+  protected Expression convert(BinaryPredicate predicate) throws 
ImpalaRuntimeException {
+    Term term = getTerm(predicate.getChild(0));
+    IcebergColumn column = term.referencedColumn_;
+
+    LiteralExpr literal = getSecondChildAsLiteralExpr(predicate);
+    checkNullLiteral(literal);
+    Operation op = getOperation(predicate);
+    Object value = getIcebergValue(column, literal);
+
+    List<Object> literals = Collections.singletonList(value);
+    return Expressions.predicate(op, term.term_, literals);
+  }
+
+  protected UnboundPredicate<Object> convert(InPredicate predicate)
+      throws ImpalaRuntimeException {
+    Term term = getTerm(predicate.getChild(0));
+    IcebergColumn column = term.referencedColumn_;
+    // Expressions takes a list of values as Objects
+    List<Object> values = new ArrayList<>();
+    for (int i = 1; i < predicate.getChildren().size(); ++i) {
+      if (!Expr.IS_LITERAL.apply(predicate.getChild(i))) {
+        throw new ImpalaRuntimeException(
+            String.format("Expression is not a literal: %s",
+                predicate.getChild(i)));
+      }
+      LiteralExpr literal = (LiteralExpr) predicate.getChild(i);
+      checkNullLiteral(literal);
+      Object value = getIcebergValue(column, literal);
+      values.add(value);
+    }
+
+    // According to the method:
+    // 
'org.apache.iceberg.expressions.InclusiveMetricsEvaluator.MetricsEvalVisitor#notIn'
+    // Expressions.notIn only works when the push-down column is the partition 
column
+    if (predicate.isNotIn()) {
+      return Expressions.notIn(term.term_, values);
+    } else {
+      return Expressions.in(term.term_, values);
+    }
+  }
+
+  protected UnboundPredicate<Object> convert(IsNullPredicate predicate)
+      throws ImpalaRuntimeException {
+    Term term = getTerm(predicate.getChild(0));
+    if (predicate.isNotNull()) {
+      return Expressions.notNull(term.term_);
+    } else {
+      return Expressions.isNull(term.term_);
+    }
+  }
+
+  protected Expression convert(CompoundPredicate predicate)
+      throws ImpalaRuntimeException {
+    Operation op = getOperation(predicate);
+
+    Expr leftExpr = predicate.getChild(0);
+    Expression left = convert(leftExpr);
+
+    if (op.equals(Operation.NOT)) {
+      return Expressions.not(left);
+    }
+
+    Expr rightExpr = predicate.getChild(1);
+    Expression right = convert(rightExpr);
+
+    return op.equals(Operation.AND) ? Expressions.and(left, right) :
+        Expressions.or(left, right);
+  }
+
+  protected void checkNullLiteral(LiteralExpr literal) throws 
ImpalaRuntimeException {
+    if (Expr.IS_NULL_LITERAL.apply(literal)) {
+      throw new ImpalaRuntimeException("Expression can't be NULL literal: " + 
literal);
+    }
+  }
+
+  protected Object getIcebergValue(IcebergColumn column, LiteralExpr literal)
+      throws ImpalaRuntimeException {
+    PrimitiveType primitiveType = literal.getType().getPrimitiveType();
+    switch (primitiveType) {
+      case BOOLEAN: return ((BoolLiteral) literal).getValue();
+      case TINYINT:
+      case SMALLINT:
+      case INT: return ((NumericLiteral) literal).getIntValue();
+      case BIGINT: return ((NumericLiteral) literal).getLongValue();
+      case FLOAT: return (float) ((NumericLiteral) literal).getDoubleValue();
+      case DOUBLE: return ((NumericLiteral) literal).getDoubleValue();
+      case STRING:
+      case DATETIME:
+      case CHAR: return ((StringLiteral) literal).getUnescapedValue();
+      case TIMESTAMP: return getIcebergTsValue(literal, column, schema_);
+      case DATE: return ((DateLiteral) literal).getValue();
+      case DECIMAL: return getIcebergDecimalValue(column, (NumericLiteral) 
literal);
+      default: {
+        throw new ImpalaRuntimeException(
+            String.format("Unable to parse Iceberg value '%s' for type %s",
+                literal.getStringValue(), primitiveType));
+      }
+    }
+  }
+
+  /**
+   * Returns Iceberg operator by BinaryPredicate operator, or null if the 
operation is not
+   * supported by Iceberg.
+   */
+  protected Operation getIcebergOperator(BinaryPredicate.Operator op)
+      throws ImpalaRuntimeException {
+    switch (op) {
+      case EQ: return Operation.EQ;
+      case NE: return Operation.NOT_EQ;
+      case LE: return Operation.LT_EQ;
+      case GE: return Operation.GT_EQ;
+      case LT: return Operation.LT;
+      case GT: return Operation.GT;
+      default:
+        throw new ImpalaRuntimeException(
+            String.format("Unsupported Impala operator: %s", op.getName()));
+    }
+  }
+
+  /**
+   * Returns Iceberg operator by CompoundPredicate operator, or null if the 
operation is
+   * not supported by Iceberg.
+   */
+  protected Operation getIcebergOperator(CompoundPredicate.Operator op)
+      throws ImpalaRuntimeException {
+    switch (op) {
+      case AND: return Operation.AND;
+      case OR: return Operation.OR;
+      case NOT: return Operation.NOT;
+      default:
+        throw new ImpalaRuntimeException(
+            String.format("Unsupported Impala operator: %s", op));
+    }
+  }
+
+  protected BigDecimal getIcebergDecimalValue(IcebergColumn column,
+      NumericLiteral literal) throws ImpalaRuntimeException {
+    Type colType = column.getType();
+    int scale = colType.getDecimalDigits();
+    BigDecimal literalValue = literal.getValue();
+
+    if (literalValue.scale() > scale) {
+      throw new ImpalaRuntimeException(
+          String.format("Invalid scale %d for type: %s", literalValue.scale(),
+              colType.toSql()));
+    }
+    // Iceberg DecimalLiteral needs to have the exact same scale.
+    if (literalValue.scale() < scale) {
+      return literalValue.setScale(scale);
+    }
+    return literalValue;
+  }
+
+  protected Long getIcebergTsValue(LiteralExpr literal, IcebergColumn column,
+      Schema iceSchema) throws ImpalaRuntimeException {
+    try {
+      org.apache.iceberg.types.Type iceType = 
iceSchema.findType(column.getFieldId());
+      Preconditions.checkState(iceType instanceof Types.TimestampType);
+      Types.TimestampType tsType = (Types.TimestampType) iceType;
+      if (tsType.shouldAdjustToUTC()) {
+        return ExprUtil.localTimestampToUnixTimeMicros(analyzer_, literal);
+      } else {
+        return ExprUtil.utcTimestampToUnixTimeMicros(analyzer_, literal);
+      }
+    } catch (InternalException ex) {
+      // We cannot interpret the timestamp literal. Maybe the timestamp is 
invalid,
+      // or the local timestamp ambiguously converts to UTC due to daylight 
saving
+      // time backward turn. E.g. '2021-10-31 02:15:00 Europe/Budapest' 
converts to
+      // either '2021-10-31 00:15:00 UTC' or '2021-10-31 01:15:00 UTC'.
+      LOG.warn("Exception occurred during timestamp conversion: %s"
+          + "\nThis means timestamp predicate is not pushed to Iceberg, let 
Impala "
+          + "backend handle it.", ex);
+    } catch (AnalysisException ignored) {}
+    throw new ImpalaRuntimeException(
+        String.format("Unable to parse timestamp value from: %s",
+            literal.getStringValue()));
+  }
+
+  protected Column getColumnFromSlotRef(SlotRef slotRef) throws 
ImpalaRuntimeException {
+    SlotDescriptor desc = slotRef.getDesc();
+    // If predicate contains map/struct, this column would be null
+    Column column = desc.getColumn();
+    if (column == null) {
+      throw new ImpalaRuntimeException(
+          "Expressions with complex types can't be converted to Iceberg 
expressions: "
+          + slotRef);
+    }
+    return column;
+  }
+
+
+  protected LiteralExpr getSecondChildAsLiteralExpr(Expr expr)
+      throws ImpalaRuntimeException {
+    if (!(expr.getChild(1) instanceof LiteralExpr)) {
+      throw new ImpalaRuntimeException(
+          String.format("Invalid child expression: %s", expr));
+    }
+    return (LiteralExpr) expr.getChild(1);
+  }
+
+  protected Operation getOperation(Expr expr) throws ImpalaRuntimeException {
+    Operation op;
+    if (expr instanceof BinaryPredicate) {
+      op = getIcebergOperator(((BinaryPredicate) expr).getOp());
+    } else if (expr instanceof CompoundPredicate) {
+      op = getIcebergOperator(((CompoundPredicate) expr).getOp());
+    } else {
+      throw new ImpalaRuntimeException(
+          String.format("Invalid expression type: %s", expr.getType()));
+    }
+    return op;
+  }
+
+  protected Term getTerm(Expr expr) throws ImpalaRuntimeException {
+    if(!(expr instanceof SlotRef)){
+      throw new ImpalaRuntimeException(
+          String.format("Unable to create term from expression: %s", 
expr.toSql()));
+    }
+    Column column = getColumnFromSlotRef((SlotRef) expr);
+    if(!(column instanceof IcebergColumn)){
+      throw new ImpalaRuntimeException(
+          String.format("Invalid column type %s for column: %s", 
column.getType(),
+              column));
+    }
+
+    return new Term(Expressions.ref(column.getName()), (IcebergColumn) column);
+  }
+
+  public static class Term {
+    public final UnboundTerm<Object> term_;
+    public final IcebergColumn referencedColumn_;
+
+    public Term(UnboundTerm<Object> term, IcebergColumn referencedColumn){
+      term_ = term;
+      referencedColumn_ = referencedColumn;
+    }
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java 
b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
index 7baccb2ac..7863d8b67 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
@@ -17,8 +17,8 @@
 
 package org.apache.impala.planner;
 
+import com.google.common.base.Preconditions;
 import java.io.IOException;
-import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -32,47 +32,38 @@ import java.util.Set;
 import java.util.TreeSet;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
-
 import org.apache.curator.shaded.com.google.common.collect.Lists;
 import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileContent;
 import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.Schema;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.ExpressionUtil;
 import org.apache.iceberg.expressions.ExpressionVisitors;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.expressions.Expression.Operation;
 import org.apache.iceberg.expressions.True;
-import org.apache.iceberg.expressions.UnboundPredicate;
 import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.types.Types;
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.BinaryPredicate;
-import org.apache.impala.analysis.CompoundPredicate;
+import org.apache.impala.analysis.BinaryPredicate.Operator;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.IcebergExpressionCollector;
 import org.apache.impala.analysis.InPredicate;
 import org.apache.impala.analysis.IsNullPredicate;
 import org.apache.impala.analysis.JoinOperator;
-import org.apache.impala.analysis.LiteralExpr;
-import org.apache.impala.catalog.Column;
-import org.apache.impala.catalog.ColumnStats;
-import org.apache.impala.analysis.BoolLiteral;
-import org.apache.impala.analysis.DateLiteral;
 import org.apache.impala.analysis.MultiAggregateInfo;
-import org.apache.impala.analysis.NumericLiteral;
 import org.apache.impala.analysis.SlotDescriptor;
 import org.apache.impala.analysis.SlotId;
 import org.apache.impala.analysis.SlotRef;
-import org.apache.impala.analysis.StringLiteral;
 import org.apache.impala.analysis.TableRef;
 import org.apache.impala.analysis.TimeTravelSpec;
 import org.apache.impala.analysis.TupleDescriptor;
 import org.apache.impala.analysis.TupleId;
-import org.apache.impala.analysis.BinaryPredicate.Operator;
+import org.apache.impala.catalog.Column;
+import org.apache.impala.catalog.ColumnStats;
 import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.IcebergColumn;
 import org.apache.impala.catalog.IcebergContentFileStore;
 import org.apache.impala.catalog.IcebergPositionDeleteTable;
@@ -80,23 +71,18 @@ import org.apache.impala.catalog.IcebergTable;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.VirtualColumn;
-import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.iceberg.IcebergMetadataTable;
 import org.apache.impala.common.AnalysisException;
+import org.apache.impala.common.IcebergPredicateConverter;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
-import org.apache.impala.common.InternalException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.planner.JoinNode.DistributionMode;
 import org.apache.impala.thrift.TColumnStats;
 import org.apache.impala.thrift.TIcebergPartitionTransformType;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TVirtualColumnType;
-import org.apache.impala.util.ExprUtil;
 import org.apache.impala.util.IcebergUtil;
-
-import com.google.common.base.Preconditions;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -220,7 +206,7 @@ public class IcebergScanPlanner {
         nonIdentityConjuncts_, getSkippedConjuncts());
     dataScanNode.init(analyzer_);
     List<Expr> outputExprs = tblRef_.getDesc().getSlots().stream().map(
-        entry -> new SlotRef(entry)).collect(Collectors.toList());
+        SlotRef::new).collect(Collectors.toList());
     UnionNode unionNode = new UnionNode(ctx_.getNextNodeId(), tblRef_.getId(),
         outputExprs, false);
     unionNode.addChild(dataScanNode, outputExprs);
@@ -404,8 +390,8 @@ public class IcebergScanPlanner {
       }
       if (dataFilesCacheMisses > 0) {
         Preconditions.checkState(timeTravelSpec != null);
-        LOG.info("File descriptors had to be loaded on demand during time 
travel: " +
-            String.valueOf(dataFilesCacheMisses));
+        LOG.info("File descriptors had to be loaded on demand during time 
travel: {}",
+            dataFilesCacheMisses);
       }
     } catch (IOException | TableLoadingException e) {
       throw new ImpalaRuntimeException(String.format(
@@ -506,7 +492,7 @@ public class IcebergScanPlanner {
       if (tblRef_.getTimeTravelSpec() == null) {
         // We should always find the data files in the cache when not doing 
time travel.
         throw new ImpalaRuntimeException("Cannot find file in cache: " + 
cf.path()
-            + " with snapshot id: " + 
String.valueOf(getIceTable().snapshotId()));
+            + " with snapshot id: " + getIceTable().snapshotId());
       }
       // We can still find the file descriptor among the old file descriptors.
       fileDesc = fileStore.getOldFileDescriptor(pathHash);
@@ -626,248 +612,21 @@ public class IcebergScanPlanner {
   }
 
   /**
-   * Returns Iceberg operator by BinaryPredicate operator, or null if the 
operation
-   * is not supported by Iceberg.
-   */
-  private Operation getIcebergOperator(BinaryPredicate.Operator op) {
-    switch (op) {
-      case EQ: return Operation.EQ;
-      case NE: return Operation.NOT_EQ;
-      case LE: return Operation.LT_EQ;
-      case GE: return Operation.GT_EQ;
-      case LT: return Operation.LT;
-      case GT: return Operation.GT;
-      default: return null;
-    }
-  }
-
-  /**
-   * Returns Iceberg operator by CompoundPredicate operator, or null if the 
operation
-   * is not supported by Iceberg.
-   */
-  private Operation getIcebergOperator(CompoundPredicate.Operator op) {
-    switch (op) {
-      case AND: return Operation.AND;
-      case OR: return Operation.OR;
-      case NOT: return Operation.NOT;
-      default: return null;
-    }
-  }
-
-  /**
-   * Transform impala predicate to iceberg predicate
+   * Transform Impala predicate to Iceberg predicate
    */
-  private boolean tryConvertIcebergPredicate(Expr expr)
-      throws ImpalaException {
-    Expression predicate = convertIcebergPredicate(expr);
-    if (predicate != null) {
+  private boolean tryConvertIcebergPredicate(Expr expr) {
+    IcebergPredicateConverter converter =
+        new IcebergPredicateConverter(getIceTable().getIcebergSchema(), 
analyzer_);
+    try {
+      Expression predicate = converter.convert(expr);
       impalaIcebergPredicateMapping_.put(predicate, expr);
-      LOG.debug("Push down the predicate: " + predicate + " to iceberg");
+      LOG.debug("Push down the predicate: {} to iceberg", predicate);
       return true;
     }
-    untranslatedExpressions_.add(expr);
-    return false;
-  }
-
-  private Expression convertIcebergPredicate(Expr expr)
-      throws ImpalaException {
-    if (expr instanceof BinaryPredicate) {
-      return convertIcebergPredicate((BinaryPredicate) expr);
-    } else if (expr instanceof InPredicate) {
-      return convertIcebergPredicate((InPredicate) expr);
-    } else if (expr instanceof IsNullPredicate) {
-      return convertIcebergPredicate((IsNullPredicate) expr);
-    } else if (expr instanceof CompoundPredicate) {
-      return convertIcebergPredicate((CompoundPredicate) expr);
-    } else {
-      return null;
-    }
-  }
-
-  private UnboundPredicate<Object> convertIcebergPredicate(BinaryPredicate 
predicate)
-      throws ImpalaException {
-    Operation op = getIcebergOperator(predicate.getOp());
-    if (op == null) {
-      return null;
-    }
-
-    // Do not convert if there is an implicit cast
-    if (!(predicate.getChild(0) instanceof SlotRef)) {
-      return null;
-    }
-    SlotRef ref = (SlotRef) predicate.getChild(0);
-
-    if (!(predicate.getChild(1) instanceof LiteralExpr)) {
-      return null;
-    }
-    LiteralExpr literal = (LiteralExpr) predicate.getChild(1);
-
-    // If predicate contains map/struct, this column would be null
-    Column col = ref.getDesc().getColumn();
-    if (col == null) {
-      return null;
-    }
-
-    // Cannot push BinaryPredicate with null literal values
-    if (Expr.IS_NULL_LITERAL.apply(literal)) {
-      return null;
-    }
-
-    Object value = getIcebergValue(ref, literal);
-    if (value == null) {
-      return null;
-    }
-
-    return Expressions.predicate(op, col.getName(), value);
-  }
-
-  private UnboundPredicate<Object> convertIcebergPredicate(InPredicate 
predicate)
-      throws ImpalaException {
-    // Do not convert if there is an implicit cast
-    if (!(predicate.getChild(0) instanceof SlotRef)) {
-      return null;
-    }
-    SlotRef ref = (SlotRef) predicate.getChild(0);
-
-    // If predicate contains map/struct, this column would be null
-    Column col = ref.getDesc().getColumn();
-    if (col == null) {
-      return null;
-    }
-
-    // Expressions takes a list of values as Objects
-    List<Object> values = new ArrayList<>();
-    for (int i = 1; i < predicate.getChildren().size(); ++i) {
-      if (!Expr.IS_LITERAL.apply(predicate.getChild(i))) {
-        return null;
-      }
-      LiteralExpr literal = (LiteralExpr) predicate.getChild(i);
-
-      // Cannot push IN or NOT_IN predicate with null literal values
-      if (Expr.IS_NULL_LITERAL.apply(literal)) {
-        return null;
-      }
-
-      Object value = getIcebergValue(ref, literal);
-      if (value == null) {
-        return null;
-      }
-
-      values.add(value);
-    }
-
-    // According to the method:
-    // 
'org.apache.iceberg.expressions.InclusiveMetricsEvaluator.MetricsEvalVisitor#notIn'
-    // Expressions.notIn only works when the push-down column is the partition 
column
-    if (predicate.isNotIn())
-      return Expressions.notIn(col.getName(), values);
-    else {
-      return Expressions.in(col.getName(), values);
-    }
-  }
-
-  private UnboundPredicate<Object> convertIcebergPredicate(IsNullPredicate 
predicate) {
-    // Do not convert if there is an implicit cast
-    if (!(predicate.getChild(0) instanceof SlotRef)) {
-      return null;
-    }
-    SlotRef ref = (SlotRef) predicate.getChild(0);
-
-    // If predicate contains map/struct, this column would be null
-    Column col = ref.getDesc().getColumn();
-    if (col == null) {
-      return null;
-    }
-
-    if (predicate.isNotNull()) {
-      return Expressions.notNull(col.getName());
-    } else{
-      return Expressions.isNull(col.getName());
-    }
-  }
-
-  private Expression convertIcebergPredicate(CompoundPredicate predicate)
-      throws ImpalaException {
-    Operation op = getIcebergOperator(predicate.getOp());
-    if (op == null) {
-      return null;
-    }
-
-    Expression left = convertIcebergPredicate(predicate.getChild(0));
-    if (left == null) {
-      return null;
-    }
-    if (op.equals(Operation.NOT)) {
-      return Expressions.not(left);
-    }
-
-    Expression right = convertIcebergPredicate(predicate.getChild(1));
-    if (right == null) {
-      return null;
-    }
-    return op.equals(Operation.AND) ? Expressions.and(left, right)
-        : Expressions.or(left, right);
-  }
-
-  private Object getIcebergValue(SlotRef ref, LiteralExpr literal)
-      throws ImpalaException {
-    IcebergColumn iceCol = (IcebergColumn) ref.getDesc().getColumn();
-    Schema iceSchema = getIceTable().getIcebergSchema();
-    switch (literal.getType().getPrimitiveType()) {
-      case BOOLEAN: return ((BoolLiteral) literal).getValue();
-      case TINYINT:
-      case SMALLINT:
-      case INT: return ((NumericLiteral) literal).getIntValue();
-      case BIGINT: return ((NumericLiteral) literal).getLongValue();
-      case FLOAT: return (float) ((NumericLiteral) literal).getDoubleValue();
-      case DOUBLE: return ((NumericLiteral) literal).getDoubleValue();
-      case STRING:
-      case DATETIME:
-      case CHAR: return ((StringLiteral) literal).getUnescapedValue();
-      case TIMESTAMP: return getIcebergTsValue(literal, iceCol, iceSchema);
-      case DATE: return ((DateLiteral) literal).getValue();
-      case DECIMAL: return getIcebergDecimalValue(ref, (NumericLiteral) 
literal);
-      default: {
-        Preconditions.checkState(false,
-            "Unsupported iceberg type considered for predicate: %s",
-            literal.getType().toSql());
-      }
-    }
-    return null;
-  }
-
-  private BigDecimal getIcebergDecimalValue(SlotRef ref, NumericLiteral 
literal) {
-    Type colType = ref.getDesc().getColumn().getType();
-    int scale = colType.getDecimalDigits();
-    BigDecimal literalValue = literal.getValue();
-
-    if (literalValue.scale() > scale) return null;
-    // Iceberg DecimalLiteral needs to have the exact same scale.
-    if (literalValue.scale() < scale) return literalValue.setScale(scale);
-    return literalValue;
-  }
-
-  private Object getIcebergTsValue(LiteralExpr literal,
-      IcebergColumn iceCol, Schema iceSchema) throws AnalysisException {
-    try {
-      org.apache.iceberg.types.Type iceType = 
iceSchema.findType(iceCol.getFieldId());
-      Preconditions.checkState(iceType instanceof Types.TimestampType);
-      Types.TimestampType tsType = (Types.TimestampType) iceType;
-      if (tsType.shouldAdjustToUTC()) {
-        return ExprUtil.localTimestampToUnixTimeMicros(analyzer_, literal);
-      } else {
-        return ExprUtil.utcTimestampToUnixTimeMicros(analyzer_, literal);
-      }
-    } catch (InternalException ex) {
-      // We cannot interpret the timestamp literal. Maybe the timestamp is 
invalid,
-      // or the local timestamp ambigously converts to UTC due to daylight 
saving
-      // time backward turn. E.g. '2021-10-31 02:15:00 Europe/Budapest' 
converts to
-      // either '2021-10-31 00:15:00 UTC' or '2021-10-31 01:15:00 UTC'.
-      LOG.warn("Exception occurred during timestamp conversion: " + 
ex.toString() +
-          "\nThis means timestamp predicate is not pushed to Iceberg, let 
Impala " +
-          "backend handle it.");
+    catch (ImpalaException e) {
+      untranslatedExpressions_.add(expr);
+      return false;
     }
-    return null;
   }
 
 }
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 170e1d94f..f89bfd40f 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -1395,7 +1395,8 @@ public class CatalogOpExecutor {
         || type == TAlterTableType.ALTER_COLUMN
         || type == TAlterTableType.SET_PARTITION_SPEC
         || type == TAlterTableType.SET_TBL_PROPERTIES
-        || type == TAlterTableType.UNSET_TBL_PROPERTIES;
+        || type == TAlterTableType.UNSET_TBL_PROPERTIES
+        || type == TAlterTableType.DROP_PARTITION;
   }
 
   /**
@@ -1472,6 +1473,14 @@ public class CatalogOpExecutor {
           needsToUpdateHms |= !unsetIcebergTblProperties(tbl, params, iceTxn);
           addSummary(response, "Updated table.");
           break;
+        case DROP_PARTITION:
+          // Metadata change only
+          needsToUpdateHms = false;
+          long droppedPartitions = 
IcebergCatalogOpExecutor.alterTableDropPartition(
+              iceTxn, params.getDrop_partition_params());
+          addSummary(
+              response, String.format("Dropped %d partition(s)", 
droppedPartitions));
+          break;
         case REPLACE_COLUMNS:
           // It doesn't make sense to replace all the columns of an Iceberg 
table as it
           // would basically make all existing data inaccessible.
diff --git 
a/fe/src/main/java/org/apache/impala/service/DescribeResultFactory.java 
b/fe/src/main/java/org/apache/impala/service/DescribeResultFactory.java
index 9f42d5547..888df7b65 100644
--- a/fe/src/main/java/org/apache/impala/service/DescribeResultFactory.java
+++ b/fe/src/main/java/org/apache/impala/service/DescribeResultFactory.java
@@ -33,7 +33,7 @@ import org.apache.impala.catalog.IcebergColumn;
 import org.apache.impala.catalog.KuduColumn;
 import org.apache.impala.catalog.StructField;
 import org.apache.impala.catalog.StructType;
-import org.apache.impala.catalog.TableLoadingException;
+import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.thrift.TColumnValue;
 import org.apache.impala.thrift.TDescribeOutputStyle;
@@ -198,8 +198,8 @@ public class DescribeResultFactory {
    * Hive's MetadataFormatUtils class is used to build the results.  
filteredColumns is a
    * list of columns the user is authorized to view.
    */
-  public static TDescribeResult buildDescribeFormattedResult(FeTable table,
-      List<Column> filteredColumns) throws TableLoadingException {
+  public static TDescribeResult buildDescribeFormattedResult(
+      FeTable table, List<Column> filteredColumns) throws 
ImpalaRuntimeException {
     TDescribeResult result = new TDescribeResult();
     result.results = Lists.newArrayList();
 
diff --git 
a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
index 786e5ff2a..f98289311 100644
--- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
@@ -35,7 +35,6 @@ import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.ReplacePartitions;
 import org.apache.iceberg.RowDelta;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.SnapshotManager;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableOperations;
@@ -57,6 +56,7 @@ import org.apache.impala.catalog.iceberg.IcebergHiveCatalog;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.fb.FbIcebergColumnStats;
 import org.apache.impala.fb.FbIcebergDataFile;
+import org.apache.impala.thrift.TAlterTableDropPartitionParams;
 import org.apache.impala.thrift.TAlterTableExecuteExpireSnapshotsParams;
 import org.apache.impala.thrift.TAlterTableExecuteRollbackParams;
 import org.apache.impala.thrift.TColumn;
@@ -239,10 +239,26 @@ public class IcebergCatalogOpExecutor {
   }
 
   /**
-   * Drops a column from a Iceberg table.
+   * Deletes files related to specific set of partitions
    */
-  public static void dropColumn(Transaction txn, String colName)
-      throws TableLoadingException, ImpalaRuntimeException {
+  public static long alterTableDropPartition(
+      Transaction iceTxn, TAlterTableDropPartitionParams params) {
+    DeleteFiles deleteFiles = iceTxn.newDelete();
+    if (params.iceberg_drop_partition_request.is_truncate) {
+      deleteFiles.deleteFromRowFilter(Expressions.alwaysTrue());
+    } else {
+      for (String path : params.iceberg_drop_partition_request.paths) {
+        deleteFiles.deleteFile(path);
+      }
+    }
+    deleteFiles.commit();
+    return params.iceberg_drop_partition_request.num_partitions;
+  }
+
+  /**
+   * Drops a column from an Iceberg table.
+   */
+  public static void dropColumn(Transaction txn, String colName) {
     UpdateSchema schema = txn.updateSchema();
     schema.deleteColumn(colName);
     schema.commit();
diff --git a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java 
b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
index 71c200dab..f7092d2fb 100644
--- a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
@@ -27,6 +27,7 @@ import com.google.common.primitives.Longs;
 import com.google.flatbuffers.FlatBufferBuilder;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.time.DateTimeException;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.OffsetDateTime;
@@ -56,6 +57,7 @@ import org.apache.iceberg.TableScan;
 import org.apache.iceberg.Transaction;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Literal;
 import org.apache.iceberg.hadoop.HadoopFileIO;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.mr.Catalogs;
@@ -101,8 +103,7 @@ import 
org.apache.impala.thrift.TIcebergPartitionTransformType;
 
 @SuppressWarnings("UnstableApiUsage")
 public class IcebergUtil {
-
-  private static final int ICEBERG_EPOCH_YEAR = 1970;
+  public static final int ICEBERG_EPOCH_YEAR = 1970;
   private static final int ICEBERG_EPOCH_MONTH = 1;
   @SuppressWarnings("unused")
   private static final int ICEBERG_EPOCH_DAY = 1;
@@ -390,28 +391,27 @@ public class IcebergUtil {
     return pSize;
   }
 
-  public static IcebergPartitionTransform getPartitionTransform(
-      PartitionField field, HashMap<String, Integer> transformParams)
-      throws TableLoadingException {
+  public static IcebergPartitionTransform getPartitionTransform(PartitionField 
field,
+      Map<String, Integer> transformParams) throws ImpalaRuntimeException {
     String type = field.transform().toString();
     String transformMappingKey = 
getPartitionTransformMappingKey(field.sourceId(),
         getPartitionTransformType(type));
     return getPartitionTransform(type, 
transformParams.get(transformMappingKey));
   }
 
-  public static IcebergPartitionTransform getPartitionTransform(String 
transformType,
-      Integer transformParam) throws TableLoadingException {
+  public static IcebergPartitionTransform getPartitionTransform(
+      String transformType, Integer transformParam) throws 
ImpalaRuntimeException {
     return new 
IcebergPartitionTransform(getPartitionTransformType(transformType),
         transformParam);
   }
 
   public static IcebergPartitionTransform getPartitionTransform(String 
transformType)
-      throws TableLoadingException {
+      throws ImpalaRuntimeException {
     return getPartitionTransform(transformType, null);
   }
 
-  private static TIcebergPartitionTransformType getPartitionTransformType(
-      String transformType) throws TableLoadingException {
+  public static TIcebergPartitionTransformType getPartitionTransformType(
+      String transformType) throws ImpalaRuntimeException {
     Preconditions.checkNotNull(transformType);
     transformType = transformType.toUpperCase();
     if ("IDENTITY".equals(transformType)) {
@@ -428,8 +428,8 @@ public class IcebergUtil {
       case "YEAR":  case "YEARS":  return TIcebergPartitionTransformType.YEAR;
       case "VOID": return TIcebergPartitionTransformType.VOID;
       default:
-        throw new TableLoadingException("Unsupported iceberg partition type: " 
+
-            transformType);
+        throw new ImpalaRuntimeException(
+            "Unsupported Iceberg partition type: " + transformType);
     }
   }
 
@@ -449,7 +449,7 @@ public class IcebergUtil {
    * expose the interface of the transform types outside their package and the 
only
    * way to get the transform's parameter is implementing this visitor class.
    */
-  public static HashMap<String, Integer> 
getPartitionTransformParams(PartitionSpec spec) {
+  public static Map<String, Integer> getPartitionTransformParams(PartitionSpec 
spec) {
     List<Pair<String, Integer>> transformParams = PartitionSpecVisitor.visit(
         spec, new PartitionSpecVisitor<Pair<String, Integer>>() {
           @Override
@@ -745,6 +745,39 @@ public class IcebergUtil {
     throw new ImpalaRuntimeException("Unexpected partition transform: " + 
transformType);
   }
 
+  /**
+   * Returns the integer representation of date transforms
+   * @param transformType type of the transform
+   * @param stringValue date value as a string
+   * @return Integer representation of a transform value, or an empty optional 
if the
+   * parse failed, or the supplied transform is not supported.
+   */
+  public static Integer getDateTimeTransformValue(
+      TIcebergPartitionTransformType transformType, String stringValue)
+      throws ImpalaRuntimeException {
+    try {
+      switch (transformType) {
+        case YEAR: return parseYearToTransformYear(stringValue);
+        case MONTH: return parseMonthToTransformMonth(stringValue);
+        case DAY: return parseDayToTransformMonth(stringValue);
+        case HOUR: return parseHourToTransformHour(stringValue);
+      }
+    } catch (NumberFormatException | DateTimeException | IllegalStateException 
e) {
+      throw new ImpalaRuntimeException(
+          String.format("Unable to parse value '%s' as '%s' transform value", 
stringValue,
+              transformType));
+    }
+    throw new ImpalaRuntimeException("Unexpected partition transform: " + 
transformType);
+  }
+
+  public static boolean isDateTimeTransformType(
+      TIcebergPartitionTransformType transformType) {
+    return transformType.equals(TIcebergPartitionTransformType.YEAR)
+        || transformType.equals(TIcebergPartitionTransformType.MONTH)
+        || transformType.equals(TIcebergPartitionTransformType.DAY)
+        || transformType.equals(TIcebergPartitionTransformType.HOUR);
+  }
+
   /**
    * In the partition path years are represented naturally, e.g. 1984. 
However, we need
    * to convert it to an integer which represents the years from 1970. So, for 
1984 the
@@ -769,13 +802,23 @@ public class IcebergUtil {
     return years * 12 + months;
   }
 
+  /**
+   * In the partition path days are represented as <year>-<month>-<day>, e.g. 
2023-12-12.
+   * This functions converts this string to an integer which represents the 
days from
+   * '1970-01-01' with the help of Iceberg's type converter.
+   */
+  private static Integer parseDayToTransformMonth(String monthStr) {
+    Literal<Integer> days = Literal.of(monthStr).to(Types.DateType.get());
+    return days.value();
+  }
+
   /**
    * In the partition path hours are represented as 
<year>-<month>-<day>-<hour>, e.g.
    * 1970-01-01-01. We need to convert it to a single integer which represents 
the hours
    * from '1970-01-01 00:00:00'.
    */
   private static Integer parseHourToTransformHour(String hourStr) {
-    final OffsetDateTime EPOCH = 
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
+    final OffsetDateTime epoch = 
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
     String[] parts = hourStr.split("-", -1);
     Preconditions.checkState(parts.length == 4);
     int year = Integer.parseInt(parts[0]);
@@ -785,7 +828,7 @@ public class IcebergUtil {
     OffsetDateTime datetime = OffsetDateTime.of(
         LocalDateTime.of(year, month, day, hour, /*minute=*/0),
         ZoneOffset.UTC);
-    return (int)ChronoUnit.HOURS.between(EPOCH, datetime);
+    return (int) ChronoUnit.HOURS.between(epoch, datetime);
   }
 
   public static TCompressionCodec parseParquetCompressionCodec(
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java 
b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
old mode 100755
new mode 100644
index 28e88a1a1..88f9dbf09
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
@@ -325,6 +325,55 @@ public class AnalyzeDDLTest extends FrontendTestBase {
     AnalysisError("alter table functional.alltypes add " +
         "partition(year=2050, month=10) location '  '",
         "URI path cannot be empty.");
+
+    // Iceberg DROP PARTITION
+    String partitioned =
+        "alter table functional_parquet.iceberg_partitioned drop partition";
+    String evolution =
+        "alter table functional_parquet.iceberg_partition_evolution drop 
partition";
+    String nonPartitioned =
+        "alter table functional_parquet.iceberg_non_partitioned drop 
partition";
+
+    AnalysisError(nonPartitioned + "(user = 'Alan')",
+        "Table is not partitioned: 
functional_parquet.iceberg_non_partitioned");
+    AnalysisError(partitioned + "(action = 'Foo')",
+        "No matching partition(s) found");
+    AnalysisError(partitioned + "(user = 'Alan')",
+        "Partition exprs cannot contain non-partition column(s): `user`");
+    AnalysisError(partitioned + "(user = 'Alan' or user = 'Lisa' and id > 10)",
+        "Partition exprs cannot contain non-partition column(s): `user`");
+    AnalysisError(partitioned + "(void(action) = 'click')",
+        "VOID transform is not supported for partition selection");
+    AnalysisError(partitioned + "(day(action) = 'Alan')",
+        "Can't filter column 'action' with transform type: 'DAY'");
+    AnalysisError(partitioned + "(action = action)",
+        "Invalid partition filtering expression: action = action");
+    AnalysisError(partitioned + "(action = action and action = 'click' "
+            + "or hour(event_time) > '2020-01-01-01')",
+        "Invalid partition filtering expression: "
+            + "action = action AND action = 'click' OR HOUR(event_time) > 
438289");
+    AnalysisError(
+        partitioned + "(action)", "Invalid partition filtering expression: 
action");
+    AnalysisError(partitioned + "(2)", "Invalid partition filtering 
expression: 2");
+    AnalysisError(partitioned + "(truncate(action))",
+        "BUCKET and TRUNCATE partition transforms should have a parameter");
+    AnalysisError(partitioned + "(truncate('string', action))",
+        "Invalid transform parameter value: string");
+    AnalysisError(partitioned + "(truncate(1, 2, action))",
+        "Invalid partition predicate: truncate(1, 2, action)");
+    AnalysisError(partitioned + " (action = 'click') purge",
+        "Partition purge is not supported for Iceberg tables");
+
+    AnalyzesOk(partitioned + "(hour(event_time) > '2020-01-01-01')");
+    AnalyzesOk(partitioned + "(hour(event_time) < '2020-02-01-01')");
+    AnalyzesOk(partitioned + "(hour(event_time) = '2020-01-01-9')");
+    AnalyzesOk(partitioned + "(hour(event_time) = '2020-01-01-9', action = 
'click')");
+    AnalyzesOk(partitioned + "(action = 'click')");
+    AnalyzesOk(partitioned + "(action = 'click' or action = 'download')");
+    AnalyzesOk(partitioned + "(action in ('click', 'download'))");
+    AnalyzesOk(partitioned + "(hour(event_time) in ('2020-01-01-9', 
'2020-01-01-1'))");
+    AnalyzesOk(evolution + "(truncate(4,date_string_col,4) = '1231')");
+    AnalyzesOk(evolution + "(month = 12)");
   }
 
   @Test
diff --git a/fe/src/test/java/org/apache/impala/util/IcebergUtilTest.java 
b/fe/src/test/java/org/apache/impala/util/IcebergUtilTest.java
index 3a3d0031c..3482c7d41 100644
--- a/fe/src/test/java/org/apache/impala/util/IcebergUtilTest.java
+++ b/fe/src/test/java/org/apache/impala/util/IcebergUtilTest.java
@@ -23,6 +23,7 @@ import static 
org.apache.impala.thrift.TIcebergCatalog.CATALOGS;
 import static org.apache.impala.thrift.TIcebergCatalog.HADOOP_CATALOG;
 import static org.apache.impala.thrift.TIcebergCatalog.HADOOP_TABLES;
 import static org.apache.impala.thrift.TIcebergCatalog.HIVE_CATALOG;
+import static org.apache.impala.util.IcebergUtil.getDateTimeTransformValue;
 import static org.apache.impala.util.IcebergUtil.getFilePathHash;
 import static org.apache.impala.util.IcebergUtil.getIcebergFileFormat;
 import static org.apache.impala.util.IcebergUtil.getPartitionTransform;
@@ -53,7 +54,6 @@ import org.apache.impala.analysis.IcebergPartitionTransform;
 import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.IcebergColumn;
 import org.apache.impala.catalog.IcebergTable;
-import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.iceberg.IcebergCatalog;
 import org.apache.impala.catalog.iceberg.IcebergCatalogs;
@@ -70,6 +70,7 @@ import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Map;
 import java.util.List;
 
 /**
@@ -202,7 +203,7 @@ public class IcebergUtilTest {
       try {
         transform = getPartitionTransform(
             partitionTransform.transformName, partitionTransform.parameter);
-      } catch (TableLoadingException t) {
+      } catch (ImpalaRuntimeException t) {
         fail("Transform " + partitionTransform + " caught unexpected  " + t);
       }
       assertNotNull(transform);
@@ -223,7 +224,7 @@ public class IcebergUtilTest {
         /* IcebergPartitionTransform transform = */ getPartitionTransform(
             partitionTransform.transformName, partitionTransform.parameter);
         fail("Transform " + partitionTransform + " should have got exception");
-      } catch (TableLoadingException t) {
+      } catch (ImpalaRuntimeException t) {
         // OK, fall through
       }
     }
@@ -239,7 +240,7 @@ public class IcebergUtilTest {
       try {
         transform = getPartitionTransform(
             partitionTransform.transformName, partitionTransform.parameter);
-      } catch (TableLoadingException t) {
+      } catch (ImpalaRuntimeException t) {
         fail("Transform " + partitionTransform + " caught unexpected  " + t);
       }
       assertNotNull(transform);
@@ -271,7 +272,7 @@ public class IcebergUtilTest {
     int numBuckets = 128;
     PartitionSpec partitionSpec =
         PartitionSpec.builderFor(SCHEMA).bucket("i", numBuckets).build();
-    HashMap<String, Integer> partitionTransformParams =
+    Map<String, Integer> partitionTransformParams =
         getPartitionTransformParams(partitionSpec);
     assertNotNull(partitionTransformParams);
     String expectedKey = "1_BUCKET";
@@ -325,6 +326,64 @@ public class IcebergUtilTest {
     }
   }
 
+  /**
+   * Unit test for getDateTransformValue
+   */
+  @Test
+  public void testGetDateTransformValue() {
+
+    assertThrows(() -> 
getDateTimeTransformValue(TIcebergPartitionTransformType.IDENTITY,
+        "string"));
+    assertThrows(
+        () -> getDateTimeTransformValue(TIcebergPartitionTransformType.BUCKET, 
"string"));
+    assertThrows(() -> 
getDateTimeTransformValue(TIcebergPartitionTransformType.TRUNCATE,
+        "string"));
+    assertThrows(
+        () -> getDateTimeTransformValue(TIcebergPartitionTransformType.VOID, 
"string"));
+
+    assertThrows(
+        () -> getDateTimeTransformValue(TIcebergPartitionTransformType.YEAR, 
"2023-12"));
+    assertThrows(
+        () -> getDateTimeTransformValue(TIcebergPartitionTransformType.MONTH, 
"2023"));
+    assertThrows(() -> 
getDateTimeTransformValue(TIcebergPartitionTransformType.DAY,
+        "2023-12-12-1"));
+    assertThrows(
+        () -> getDateTimeTransformValue(TIcebergPartitionTransformType.HOUR, 
"2023-12"));
+
+    try {
+      int yearTransformValidString =
+          getDateTimeTransformValue(TIcebergPartitionTransformType.YEAR, 
"2023");
+      int monthTransformValidString =
+          getDateTimeTransformValue(TIcebergPartitionTransformType.MONTH, 
"2023-12");
+      int dayTransformValidString =
+          getDateTimeTransformValue(TIcebergPartitionTransformType.DAY, 
"2023-12-12");
+      int hourTransformValidString =
+          getDateTimeTransformValue(TIcebergPartitionTransformType.HOUR, 
"2023-12-12-1");
+
+      assertEquals(53, yearTransformValidString);
+      assertEquals(647, monthTransformValidString);
+      assertEquals(19703, dayTransformValidString);
+      assertEquals(472873, hourTransformValidString);
+
+    } catch (ImpalaRuntimeException e) {
+      fail(String.format("Unexpected parse error: %s", e));
+    }
+  }
+
+  interface DateTimeTransformCallable {
+    Integer call() throws ImpalaRuntimeException;
+  }
+
+  private void assertThrows(DateTimeTransformCallable function) {
+    try {
+      function.call();
+    } catch (ImpalaRuntimeException e) {
+      assertEquals(ImpalaRuntimeException.class, e.getClass());
+      return;
+    }
+    fail();
+  }
+
   /**
    * Holder class for testing Partition transforms.
    */
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-drop-partition.test
 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-drop-partition.test
new file mode 100644
index 000000000..15d7c089f
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-drop-partition.test
@@ -0,0 +1,287 @@
+====
+---- QUERY
+# Failing implicit string to hour cast
+alter table iceberg_all_partitions drop partition (hour(hour_timestamp) = 
"2012-12-12");
+---- CATCH
+AnalysisException: operands of type INT and STRING are not comparable: 
HOUR(hour_timestamp) = '2012-12-12'
+====
+---- QUERY
+# Failing implicit string to day cast
+alter table iceberg_all_partitions drop partition (day(day_date) = "2012-12");
+---- CATCH
+AnalysisException: operands of type INT and STRING are not comparable: 
DAY(day_date) = '2012-12'
+====
+---- QUERY
+# Failing implicit string to month cast
+alter table iceberg_all_partitions drop partition (month(month_date) = "2012");
+---- CATCH
+AnalysisException: operands of type INT and STRING are not comparable: 
MONTH(month_date) = '2012'
+====
+---- QUERY
+# Failing implicit string to year cast
+alter table iceberg_all_partitions drop partition (year(year_date) = 
"2012-12-12-20");
+---- CATCH
+AnalysisException: operands of type INT and STRING are not comparable: 
YEAR(year_date) = '2012-12-12-20'
+====
+---- QUERY
+INSERT INTO iceberg_all_partitions(identity_boolean) VALUES (true);
+INSERT INTO iceberg_all_partitions(identity_int) VALUES (1);
+INSERT INTO iceberg_all_partitions(identity_bigint) VALUES (1);
+INSERT INTO iceberg_all_partitions(identity_float) VALUES (1.0);
+INSERT INTO iceberg_all_partitions(identity_double) VALUES (1.0);
+INSERT INTO iceberg_all_partitions(identity_decimal) VALUES (1);
+INSERT INTO iceberg_all_partitions(identity_date) VALUES ('2000-12-12');
+INSERT INTO iceberg_all_partitions(identity_string) VALUES 
("string-transform-omitted");
+INSERT INTO iceberg_all_partitions(identity_string) VALUES 
("string-transform-set");
+INSERT INTO iceberg_all_partitions(identity_string) VALUES ("string"), 
("another-string");
+INSERT INTO iceberg_all_partitions(identity_string) VALUES ("string"), 
("another-string");
+INSERT INTO iceberg_all_partitions(bucket_int) VALUES (100), (200);
+INSERT INTO iceberg_all_partitions(bucket_bigint) VALUES (100);
+INSERT INTO iceberg_all_partitions(bucket_decimal) VALUES (10);
+INSERT INTO iceberg_all_partitions(bucket_date) VALUES ("1526-01-12");
+INSERT INTO iceberg_all_partitions(bucket_string) VALUES ("string");
+INSERT INTO iceberg_all_partitions(bucket_timestamp) VALUES ("1583-04-02 
03:00:00");
+INSERT INTO iceberg_all_partitions(truncate_int) VALUES (131072);
+INSERT INTO iceberg_all_partitions(truncate_bigint) VALUES (68719476736);
+INSERT INTO iceberg_all_partitions(truncate_decimal) VALUES 
(100000.1234567891);
+INSERT INTO iceberg_all_partitions(truncate_string) VALUES 
('thisisalongstring');
+INSERT INTO iceberg_all_partitions(year_date) VALUES ('2077-05-06');
+INSERT INTO iceberg_all_partitions(month_date) VALUES ('2023-12-01');
+INSERT INTO iceberg_all_partitions(day_date) VALUES ('2023-12-01');
+INSERT INTO iceberg_all_partitions(year_timestamp) VALUES ('2023-12-02 
00:00:00');
+INSERT INTO iceberg_all_partitions(month_timestamp) VALUES ('2023-12-02 
00:00:00');
+INSERT INTO iceberg_all_partitions(day_timestamp) VALUES ('2023-03-02 
00:00:00');
+INSERT INTO iceberg_all_partitions(hour_timestamp) VALUES ('2023-06-02 
00:00:00');
+INSERT INTO iceberg_all_partitions(identity_string, hour_timestamp) VALUES 
('string-hour','2023-03-02 00:00:00');
+INSERT INTO iceberg_all_partitions(identity_string, hour_timestamp) VALUES 
('another-string-hour', '2023-03-02 00:00:00');
+INSERT INTO iceberg_all_partitions(identity_string, hour_timestamp) VALUES 
('another-string-hour', '2023-03-02 10:00:00');
+INSERT INTO iceberg_all_partitions(identity_string, hour_timestamp) VALUES 
('string-hour', '2023-03-02 10:00:00');
+INSERT INTO iceberg_all_partitions(identity_string, identity_int) VALUES 
('string-comma', 567);
+INSERT INTO iceberg_all_partitions(identity_string, identity_int) VALUES 
('string-comma', 568);
+INSERT INTO iceberg_all_partitions(identity_int) VALUES (NULL);
+====
+---- QUERY
+# Number of partitions before DROP PARTITION queries
+SELECT COUNT(1) FROM $DATABASE.iceberg_all_partitions.`partitions`;
+---- RESULTS
+36
+====
+---- QUERY
+ALTER TABLE iceberg_all_partitions DROP IF EXISTS PARTITION (identity_boolean 
= false)
+---- RESULTS
+'Dropped 0 partition(s)'
+====
+---- QUERY
+ALTER TABLE iceberg_all_partitions DROP PARTITION (identity_boolean = false)
+---- CATCH
+No matching partition(s) found
+====
+---- QUERY
+ALTER TABLE iceberg_all_partitions DROP PARTITION (identity_boolean = true)
+---- RESULTS
+'Dropped 1 partition(s)'
+====
+---- QUERY
+ALTER TABLE iceberg_all_partitions DROP PARTITION (identity_int = 1)
+---- RESULTS
+'Dropped 1 partition(s)'
+====
+---- QUERY
+ALTER TABLE iceberg_all_partitions DROP PARTITION (identity_bigint = 1)
+---- RESULTS
+'Dropped 1 partition(s)'
+====
+---- QUERY
+ALTER TABLE iceberg_all_partitions DROP PARTITION (identity_float < 3.0)
+---- RESULTS
+'Dropped 1 partition(s)'
+====
+---- QUERY
+ALTER TABLE iceberg_all_partitions DROP PARTITION (identity_double > 0.0)
+---- RESULTS
+'Dropped 1 partition(s)'
+====
+---- QUERY
+ALTER TABLE iceberg_all_partitions DROP PARTITION (identity_decimal < 3);
+---- RESULTS
+'Dropped 1 partition(s)'
+====
+---- QUERY
+ALTER TABLE iceberg_all_partitions DROP PARTITION (identity_date = 
'2000-12-12');
+---- RESULTS
+'Dropped 1 partition(s)'
+====
+---- QUERY
+ALTER TABLE iceberg_all_partitions DROP PARTITION (identity_string = 
"string-transform-omitted");
+---- RESULTS
+'Dropped 1 partition(s)'
+====
+---- QUERY
+ALTER TABLE iceberg_all_partitions DROP PARTITION (identity(identity_string) = 
"string-transform-set");
+---- RESULTS
+'Dropped 1 partition(s)'
+====
+---- QUERY
+ALTER TABLE iceberg_all_partitions DROP PARTITION (identity(identity_string) = 
"another-string");
+---- RESULTS
+'Dropped 1 partition(s)'
+====
+---- QUERY
+ALTER TABLE iceberg_all_partitions DROP PARTITION (identity(identity_string) = 
"string");
+---- RESULTS
+'Dropped 1 partition(s)'
+====
+---- QUERY
+ALTER TABLE iceberg_all_partitions DROP PARTITION (bucket(5, bucket_int) in 
(1,2));
+---- RESULTS
+'Dropped 2 partition(s)'
+====
+---- QUERY
+ALTER TABLE iceberg_all_partitions DROP PARTITION (bucket(5, bucket_bigint) = 
1);
+---- RESULTS
+'Dropped 1 partition(s)'
+====
+---- QUERY
+ALTER TABLE iceberg_all_partitions DROP PARTITION (bucket(5, bucket_decimal) = 
3);
+---- RESULTS
+'Dropped 1 partition(s)'
+====
+---- QUERY
+ALTER TABLE iceberg_all_partitions DROP PARTITION (bucket(5, bucket_date) = 0);
+---- RESULTS
+'Dropped 1 partition(s)'
+====
+---- QUERY
+ALTER TABLE iceberg_all_partitions DROP PARTITION (bucket(5, bucket_timestamp) 
= 1);
+---- RESULTS
+'Dropped 1 partition(s)'
+====
+---- QUERY
+ALTER TABLE iceberg_all_partitions DROP PARTITION (bucket(5, bucket_string) = 
1);
+---- RESULTS
+'Dropped 1 partition(s)'
+====
+---- QUERY
+ALTER TABLE iceberg_all_partitions DROP PARTITION (truncate(5, truncate_int) = 
131070);
+---- RESULTS
+'Dropped 1 partition(s)'
+====
+---- QUERY
+ALTER TABLE iceberg_all_partitions DROP PARTITION (truncate(5, 
truncate_bigint) = 68719476735);
+---- RESULTS
+'Dropped 1 partition(s)'
+====
+---- QUERY
+ALTER TABLE iceberg_all_partitions DROP PARTITION (truncate(5, 
truncate_decimal) = 100000.1234567890);
+---- RESULTS
+'Dropped 1 partition(s)'
+====
+---- QUERY
+ALTER TABLE iceberg_all_partitions DROP PARTITION (truncate(5, 
truncate_string) = 'thisi');
+---- RESULTS
+'Dropped 1 partition(s)'
+====
+---- QUERY
+ALTER TABLE iceberg_all_partitions DROP PARTITION (year(year_date) = '2077');
+---- RESULTS
+'Dropped 1 partition(s)'
+====
+---- QUERY
+# Checkpoint for remaining partitions
+SELECT COUNT(1) FROM $DATABASE.iceberg_all_partitions.`partitions`;
+---- RESULTS
+13
+====
+---- QUERY
+ALTER TABLE iceberg_all_partitions DROP PARTITION (month(month_date) = 
'2023-12');
+---- RESULTS
+'Dropped 1 partition(s)'
+====
+---- QUERY
+ALTER TABLE iceberg_all_partitions DROP PARTITION (day(day_date) = 
'2023-12-01');
+---- RESULTS
+'Dropped 1 partition(s)'
+====
+---- QUERY
+ALTER TABLE iceberg_all_partitions DROP PARTITION (year(year_timestamp) = 
'2023');
+---- RESULTS
+'Dropped 1 partition(s)'
+====
+---- QUERY
+ALTER TABLE iceberg_all_partitions DROP PARTITION (month(month_timestamp) = 
'2023-12');
+---- RESULTS
+'Dropped 1 partition(s)'
+====
+---- QUERY
+ALTER TABLE iceberg_all_partitions DROP PARTITION (day(day_timestamp) = 
'2023-03-02');
+---- RESULTS
+'Dropped 1 partition(s)'
+====
+---- QUERY
+ALTER TABLE iceberg_all_partitions DROP PARTITION (hour(hour_timestamp) = 
'2023-06-02-0');
+---- RESULTS
+'Dropped 1 partition(s)'
+====
+---- QUERY
+ALTER TABLE iceberg_all_partitions DROP PARTITION (identity_string in 
('string-hour', 'another-string-hour') and hour(hour_timestamp) = 
'2023-03-02-10');
+---- RESULTS
+'Dropped 2 partition(s)'
+====
+---- QUERY
+ALTER TABLE iceberg_all_partitions DROP PARTITION (hour(hour_timestamp) < 
'2030-03-02-10');
+---- RESULTS
+'Dropped 2 partition(s)'
+====
+---- QUERY
+ALTER TABLE iceberg_all_partitions DROP PARTITION (identity_string = 
"string-comma", identity_int in (567, 568));
+---- RESULTS
+'Dropped 2 partition(s)'
+====
+---- QUERY
+ALTER TABLE iceberg_all_partitions DROP PARTITION (identity_int IS NULL);
+---- RESULTS
+'Dropped 1 partition(s)'
+====
+---- QUERY
+# Number of partitions after DROP PARTITION queries
+SELECT count(1) FROM $DATABASE.iceberg_all_partitions.`partitions`;
+---- RESULTS
+0
+====
+---- QUERY
+# Partition evolution
+CREATE TABLE iceberg_drop_partition_evolution(identity_int int, 
unpartitioned_int_to_identity_int int, year_date_col_to_month_date_col date)
+PARTITIONED BY SPEC(identity(identity_int), 
year(year_date_col_to_month_date_col)) STORED AS ICEBERG;
+INSERT INTO iceberg_drop_partition_evolution VALUES (1, 2, "2023-10-11");
+ALTER TABLE iceberg_drop_partition_evolution SET PARTITION 
SPEC(identity(identity_int), identity(unpartitioned_int_to_identity_int), 
year(year_date_col_to_month_date_col));
+INSERT INTO iceberg_drop_partition_evolution VALUES (1, 2, "2023-01-11");
+ALTER TABLE iceberg_drop_partition_evolution DROP PARTITION 
(unpartitioned_int_to_identity_int = 2);
+---- RESULTS
+'Dropped 1 partition(s)'
+====
+---- QUERY
+ALTER TABLE iceberg_drop_partition_evolution SET PARTITION 
SPEC(identity(identity_int), month(year_date_col_to_month_date_col));
+ALTER TABLE iceberg_drop_partition_evolution DROP PARTITION 
(unpartitioned_int_to_identity_int = 2);
+---- CATCH
+AnalysisException: Partition exprs cannot contain non-partition column(s): 
unpartitioned_int_to_identity_int
+====
+---- QUERY
+INSERT INTO iceberg_drop_partition_evolution VALUES (1, 2, "2023-11-11");
+ALTER TABLE iceberg_drop_partition_evolution DROP PARTITION 
(month(year_date_col_to_month_date_col) = "2023-11");
+---- RESULTS
+'Dropped 1 partition(s)'
+====
+---- QUERY
+# Dropping delete files
+CREATE TABLE iceberg_drop_partition_delete(identity_int int, unpartitioned_int 
int)
+PARTITIONED BY SPEC (identity_int) STORED AS ICEBERG 
TBLPROPERTIES('format-version'='2');
+INSERT INTO iceberg_drop_partition_delete VALUES (1,2);
+INSERT INTO iceberg_drop_partition_delete VALUES (2,1);
+DELETE FROM iceberg_drop_partition_delete WHERE identity_int = 1;
+ALTER TABLE iceberg_drop_partition_delete DROP PARTITION (identity_int = 1);
+SHOW FILES IN iceberg_drop_partition_delete;
+---- RESULTS
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/iceberg_drop_partition_delete/data/identity_int=2/.*_data.*.parq','.*','','$ERASURECODE_POLICY'
+---- TYPES
+STRING, STRING, STRING, STRING
+====
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
index ef8f7fe30..6f70d510c 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
@@ -347,11 +347,6 @@ ALTER TABLE iceberg_table_hadoop_catalog ADD 
PARTITION(fake_col='fake_value');
 AnalysisException: ALTER TABLE ADD PARTITION is not supported for Iceberg 
tables: $DATABASE.iceberg_table_hadoop_catalog
 ====
 ---- QUERY
-ALTER TABLE iceberg_table_hadoop_catalog DROP PARTITION(fake_col='fake_value');
----- CATCH
-AnalysisException: ALTER TABLE DROP PARTITION is not supported for Iceberg 
tables: $DATABASE.iceberg_table_hadoop_catalog
-====
----- QUERY
 ALTER TABLE iceberg_table_hadoop_catalog RECOVER PARTITIONS;
 ---- CATCH
 AnalysisException: ALTER TABLE RECOVER PARTITIONS is not supported on Iceberg 
tables: $DATABASE.iceberg_table_hadoop_catalog
@@ -446,7 +441,7 @@ CREATE TABLE iceberg_wrong_partition (i int)
 PARTITIONED BY SPEC (wrong(i))
 STORED AS ICEBERG;
 ---- CATCH
-Unsupported iceberg partition type: WRONG
+Unsupported Iceberg partition type: WRONG
 ====
 ---- QUERY
 CREATE TABLE iceberg_wrong_parquet_row_group_size1 ( i int)
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 58b3ad14c..2ae967d24 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -1173,6 +1173,55 @@ class TestIcebergTable(IcebergTestSuite):
     assert len(data.data) == 1
     assert data.data[0] == '1'
 
+  def test_drop_partition(self, vector, unique_database):
+    create_table_stmt = """CREATE TABLE {}.iceberg_all_partitions
+        (identity_boolean boolean, identity_int int, identity_bigint bigint,
+        identity_float float, identity_double double, identity_decimal 
decimal(20,10),
+        identity_date date, identity_timestamp timestamp, identity_string 
string,
+        bucket_int int, bucket_bigint bigint, bucket_decimal decimal(20,10),
+        bucket_date date, bucket_timestamp timestamp, bucket_string string,
+        truncate_int int, truncate_bigint bigint, truncate_decimal 
decimal(20,10),
+        truncate_string string, year_date date, year_timestamp timestamp,
+        month_date date, month_timestamp timestamp, day_date date,
+        day_timestamp timestamp, hour_timestamp timestamp)
+      PARTITIONED BY SPEC
+        (identity(identity_boolean), identity(identity_int), 
identity(identity_bigint),
+        identity(identity_float), identity(identity_double), 
identity(identity_decimal),
+        identity(identity_date), identity(identity_string), 
bucket(5,bucket_int),
+        bucket(5,bucket_bigint), bucket(5,bucket_decimal), 
bucket(5,bucket_date),
+        bucket(5,bucket_timestamp), bucket(5,bucket_string), 
truncate(5,truncate_int),
+        truncate(5,truncate_bigint), truncate(5,truncate_decimal),
+        truncate(5,truncate_string), year(year_date), year(year_timestamp),
+        month(month_date), month(month_timestamp), day(day_date), 
day(day_timestamp),
+        hour(hour_timestamp))
+      STORED AS ICEBERG""".format(unique_database)
+    self.execute_query(create_table_stmt)
+    self.run_test_case('QueryTest/iceberg-drop-partition', vector,
+      use_db=unique_database)
+
+  def test_rollback_after_drop_partition(self, vector, unique_database):
+    table_name = "iceberg_drop_partition_rollback"
+    qualified_table_name = "{}.{}".format(unique_database, table_name)
+    create_table_stmt = """CREATE TABLE {}(identity_int int, unpartitioned_int 
int)
+      PARTITIONED BY SPEC (identity_int) STORED AS 
ICEBERG""".format(qualified_table_name)
+    insert_into_stmt = """INSERT INTO {} values(1, 
2)""".format(qualified_table_name)
+    drop_partition_stmt = """ALTER TABLE {} DROP PARTITION (identity_int = 
1)""".format(
+      qualified_table_name)
+
+    self.execute_query(create_table_stmt)
+    self.execute_query(insert_into_stmt)
+    self.execute_query(drop_partition_stmt)
+
+    snapshots = get_snapshots(self.client, qualified_table_name, 
expected_result_size=2)
+    rollback = """ALTER TABLE {} EXECUTE ROLLBACK ({})""".format(
+      qualified_table_name, snapshots[0].get_snapshot_id())
+    # Rollback before DROP PARTITION
+    self.execute_query(rollback)
+    snapshots = get_snapshots(self.client, qualified_table_name, 
expected_result_size=3)
+    assert snapshots[0].get_snapshot_id() == snapshots[2].get_snapshot_id()
+    assert snapshots[0].get_parent_id() == snapshots[2].get_parent_id()
+    assert snapshots[0].get_creation_time() < snapshots[2].get_creation_time()
+
 
 class TestIcebergV2Table(IcebergTestSuite):
   """Tests related to Iceberg V2 tables."""

Reply via email to