This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new c57921225 IMPALA-11877: (part 1) Add support for DELETE statements for 
UNPARTITIONED Iceberg tables
c57921225 is described below

commit c57921225113ef06d2d4358652412f6f5f7f15b6
Author: Zoltan Borok-Nagy <borokna...@cloudera.com>
AuthorDate: Mon Apr 3 14:36:18 2023 +0200

    IMPALA-11877: (part 1) Add support for DELETE statements for UNPARTITIONED 
Iceberg tables
    
    This patch adds support for DELETE statements on unpartitioned Iceberg
    tables. Impala uses the 'merge-on-read' mode with position delete files.
    
    The patch reuses the existing IcebergPositionDeleteTable as the target
    table of the DELETE statements, because this table already has the same
    schema as position delete files, even with correct Iceberg field IDs.
    
    The patch basically rewrites DELETE statements to INSERT statements,
    e.g.:
    
    from:
     DELETE FROM ice_t WHERE id = 42;
    
    to:
     INSERT INTO ice_t-POSITION-DELETE
     SELECT INPUT__FILE__NAME, FILE__POSITION
     FROM ice_t
     WHERE id = 42;
    
    Position delete files need to be ordered by (file_path, pos), so
    we add an extra SORT node before the table sink operator.
    
    In the backend the patch adds a new table sink operator, the
    IcebergDeleteSink. It writes the incoming rows (file_path, position) to
    delete files. It reuses a lot of code from HdfsTableSink, so this patch
    moves the common code to the new common base class: TableSinkBase.
    
    The coordinator then collects the written delete files and invokes
    UpdateCatalog to finalize the DELETE statement.
    
    The Catalog then uses Iceberg APIs to create a new snapshot with the
    created delete files. It also validates that there was no conflicting
    data files written since the operation started.
    
    Testing:
     * added planer test
     * e2e tests
     * interop test between Impala and Hive
    
    Change-Id: Ic933b2295abe54b46d2a736961219988ff42915b
    Reviewed-on: http://gerrit.cloudera.org:8080/19776
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
    Reviewed-by: Gabor Kaszab <gaborkas...@cloudera.com>
---
 be/src/exec/CMakeLists.txt                         |   2 +
 be/src/exec/data-sink.cc                           |   8 +-
 be/src/exec/file-metadata-utils.cc                 |   1 -
 be/src/exec/hdfs-table-sink.cc                     | 269 +------------------
 be/src/exec/hdfs-table-sink.h                      | 118 ++-------
 be/src/exec/hdfs-table-writer.cc                   |   2 +-
 be/src/exec/hdfs-table-writer.h                    |   6 +-
 be/src/exec/iceberg-delete-sink.cc                 | 165 ++++++++++++
 be/src/exec/iceberg-delete-sink.h                  |  89 +++++++
 be/src/exec/parquet/hdfs-parquet-table-writer.cc   |   8 +-
 be/src/exec/parquet/hdfs-parquet-table-writer.h    |   4 +-
 be/src/exec/table-sink-base.cc                     | 293 +++++++++++++++++++++
 be/src/exec/table-sink-base.h                      | 144 ++++++++++
 be/src/runtime/coordinator.cc                      |   2 +-
 be/src/runtime/descriptors.cc                      |   1 +
 be/src/service/client-request-state.cc             |  23 +-
 common/thrift/CatalogObjects.thrift                |   2 +-
 common/thrift/CatalogService.thrift                |   8 +-
 common/thrift/DataSinks.thrift                     |   4 +
 common/thrift/Query.thrift                         |  20 +-
 common/thrift/Types.thrift                         |   5 +
 .../org/apache/impala/analysis/DeleteStmt.java     |   3 +-
 .../impala/analysis/IcebergPartitionSpec.java      |   8 +-
 .../org/apache/impala/analysis/ModifyStmt.java     | 181 +++++++++++--
 .../java/org/apache/impala/analysis/SlotRef.java   |  11 +-
 .../org/apache/impala/analysis/UpdateStmt.java     |   3 +-
 .../org/apache/impala/catalog/FeIcebergTable.java  |  39 +++
 .../impala/catalog/IcebergPositionDeleteTable.java |  10 +-
 .../org/apache/impala/catalog/IcebergTable.java    |   4 +-
 .../org/apache/impala/catalog/VirtualColumn.java   |   9 +-
 .../org/apache/impala/planner/HdfsScanNode.java    |   9 +-
 .../apache/impala/planner/IcebergDeleteSink.java   | 160 +++++++++++
 .../java/org/apache/impala/planner/Planner.java    |  34 ++-
 .../java/org/apache/impala/planner/TableSink.java  |  12 +
 .../apache/impala/service/CatalogOpExecutor.java   |   2 +-
 .../java/org/apache/impala/service/Frontend.java   |  60 ++++-
 .../impala/service/IcebergCatalogOpExecutor.java   |  40 +++
 .../impala/analysis/AnalyzeModifyStmtsTest.java    |   9 +-
 .../org/apache/impala/planner/PlannerTest.java     |  10 +
 .../queries/PlannerTest/iceberg-v2-delete.test     | 195 ++++++++++++++
 .../queries/QueryTest/iceberg-delete-complex.test  | 106 ++++++++
 .../queries/QueryTest/iceberg-delete.test          | 117 ++++++++
 .../queries/QueryTest/iceberg-negative.test        |  52 ++++
 tests/query_test/test_iceberg.py                   |  59 ++++-
 44 files changed, 1850 insertions(+), 457 deletions(-)

diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 52fd3b95b..776f50fee 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -76,6 +76,7 @@ add_library(Exec
   hdfs-table-sink.cc
   hdfs-table-writer.cc
   hdfs-text-table-writer.cc
+  iceberg-delete-sink.cc
   incr-stats-util.cc
   join-builder.cc
   nested-loop-join-builder.cc
@@ -93,6 +94,7 @@ add_library(Exec
   sort-node.cc
   streaming-aggregation-node.cc
   subplan-node.cc
+  table-sink-base.cc
   text-converter.cc
   topn-node.cc
   union-node.cc
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index 790b7b214..d41bbac8e 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -24,6 +24,7 @@
 #include "exec/exec-node.h"
 #include "exec/hbase/hbase-table-sink.h"
 #include "exec/hdfs-table-sink.h"
+#include "exec/iceberg-delete-sink.h"
 #include "exec/kudu/kudu-table-sink.h"
 #include "exec/kudu/kudu-util.h"
 #include "exec/blocking-plan-root-sink.h"
@@ -83,7 +84,12 @@ Status DataSinkConfig::CreateConfig(const TDataSink& 
thrift_sink,
       if (!thrift_sink.__isset.table_sink) return Status("Missing table 
sink.");
       switch (thrift_sink.table_sink.type) {
         case TTableSinkType::HDFS:
-          *data_sink = pool->Add(new HdfsTableSinkConfig());
+          if (thrift_sink.table_sink.action == TSinkAction::INSERT) {
+            *data_sink = pool->Add(new HdfsTableSinkConfig());
+          } else if (thrift_sink.table_sink.action == TSinkAction::DELETE) {
+            // Currently only Iceberg tables support DELETE operations for FS 
tables.
+            *data_sink = pool->Add(new IcebergDeleteSinkConfig());
+          }
           break;
         case TTableSinkType::KUDU:
           RETURN_IF_ERROR(CheckKuduAvailability());
diff --git a/be/src/exec/file-metadata-utils.cc 
b/be/src/exec/file-metadata-utils.cc
index af575c8f0..f886b50c1 100644
--- a/be/src/exec/file-metadata-utils.cc
+++ b/be/src/exec/file-metadata-utils.cc
@@ -89,7 +89,6 @@ void FileMetadataUtils::AddIcebergColumns(MemPool* mem_pool, 
Tuple** template_tu
     *template_tuple = Tuple::Create(tuple_desc->byte_size(), mem_pool);
   }
   for (const SlotDescriptor* slot_desc : scan_node_->tuple_desc()->slots()) {
-    // The partition column of Iceberg tables must not be virtual column
     if (slot_desc->IsVirtual()) continue;
     const SchemaPath& path = slot_desc->col_path();
     if (path.size() != 1) continue;
diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index 355cb7479..2534f711b 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -78,10 +78,8 @@ void HdfsTableSinkConfig::Close() {
 
 HdfsTableSink::HdfsTableSink(TDataSinkId sink_id, const HdfsTableSinkConfig& 
sink_config,
     const THdfsTableSink& hdfs_sink, RuntimeState* state)
-  : DataSink(sink_id, sink_config, "HdfsTableSink", state),
-    table_desc_(nullptr),
+  : TableSinkBase(sink_id, sink_config, "HdfsTableSink", state),
     prototype_partition_(nullptr),
-    table_id_(sink_config.tsink_->table_sink.target_table_id),
     skip_header_line_count_(
         hdfs_sink.__isset.skip_header_line_count ? 
hdfs_sink.skip_header_line_count : 0),
     overwrite_(hdfs_sink.overwrite),
@@ -111,9 +109,9 @@ HdfsTableSink::HdfsTableSink(TDataSinkId sink_id, const 
HdfsTableSinkConfig& sin
 }
 
 Status HdfsTableSink::Prepare(RuntimeState* state, MemTracker* 
parent_mem_tracker) {
-  RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
-  unique_id_str_ = PrintId(state->fragment_instance_id(), "-");
   SCOPED_TIMER(profile()->total_time_counter());
+  RETURN_IF_ERROR(TableSinkBase::Prepare(state, parent_mem_tracker));
+  unique_id_str_ = PrintId(state->fragment_instance_id(), "-");
   RETURN_IF_ERROR(ScalarExprEvaluator::Create(partition_key_exprs_, state,
       state->obj_pool(), expr_perm_pool_.get(), expr_results_pool_.get(),
       &partition_key_expr_evals_));
@@ -148,14 +146,6 @@ Status HdfsTableSink::Prepare(RuntimeState* state, 
MemTracker* parent_mem_tracke
   DCHECK_GE(output_expr_evals_.size(),
       table_desc_->num_cols() - table_desc_->num_clustering_cols()) << 
DebugString();
 
-  partitions_created_counter_ = ADD_COUNTER(profile(), "PartitionsCreated", 
TUnit::UNIT);
-  files_created_counter_ = ADD_COUNTER(profile(), "FilesCreated", TUnit::UNIT);
-  rows_inserted_counter_ = ADD_COUNTER(profile(), "RowsInserted", TUnit::UNIT);
-  bytes_written_counter_ = ADD_COUNTER(profile(), "BytesWritten", 
TUnit::BYTES);
-  encode_timer_ = ADD_TIMER(profile(), "EncodeTimer");
-  hdfs_write_timer_ = ADD_TIMER(profile(), "HdfsWriteTimer");
-  compress_timer_ = ADD_TIMER(profile(), "CompressTimer");
-
   return Status::OK();
 }
 
@@ -220,7 +210,8 @@ void HdfsTableSink::BuildPartitionDescMap() {
 }
 
 Status HdfsTableSink::Open(RuntimeState* state) {
-  RETURN_IF_ERROR(DataSink::Open(state));
+  SCOPED_TIMER(profile()->total_time_counter());
+  RETURN_IF_ERROR(TableSinkBase::Open(state));
   DCHECK_EQ(partition_key_exprs_.size(), partition_key_expr_evals_.size());
   RETURN_IF_ERROR(ScalarExprEvaluator::Open(partition_key_expr_evals_, state));
   if (!IsIceberg()) BuildPartitionDescMap();
@@ -228,115 +219,6 @@ Status HdfsTableSink::Open(RuntimeState* state) {
   return Status::OK();
 }
 
-void HdfsTableSink::BuildHdfsFileNames(
-    const HdfsPartitionDescriptor& partition_descriptor,
-    OutputPartition* output_partition, const string &external_partition_name) {
-
-  // Create final_hdfs_file_name_prefix and tmp_hdfs_file_name_prefix.
-  // Path: <hdfs_base_dir>/<partition_values>/<unique_id_str>
-  // Or, for transactional tables:
-  // Path: 
<hdfs_base_dir>/<partition_values>/<transaction_directory>/<unique_id_str>
-  // Where <transaction_directory> is either a 'base' or a 'delta' directory 
in Hive ACID
-  // terminology.
-
-  // Temporary files are written under the following path which is unique to 
this sink:
-  // 
<table_dir>/_impala_insert_staging/<query_id>/<per_fragment_unique_id>_dir/
-  // Both the temporary directory and the file name, when moved to the real 
partition
-  // directory must be unique.
-  // Prefix the directory name with "." to make it hidden and append "_dir" at 
the end
-  // of the directory to avoid name clashes for unpartitioned tables.
-  // The files are located in <partition_values>/<random_value>_data under
-  // tmp_hdfs_file_name_prefix.
-
-  // Use the query id as filename.
-  const string& query_suffix = Substitute("$0_$1_data", unique_id_str_, 
rand());
-
-  output_partition->tmp_hdfs_dir_name =
-      Substitute("$0/.$1_$2_dir/", staging_dir_, unique_id_str_, rand());
-  output_partition->tmp_hdfs_file_name_prefix = Substitute("$0$1/$2",
-      output_partition->tmp_hdfs_dir_name, output_partition->partition_name,
-      query_suffix);
-
-  if (HasExternalOutputDir()) {
-    // When an external FE has provided a staging directory we use that 
directly.
-    // We are trusting that the external frontend implementation has done 
appropriate
-    // authorization checks on the external output directory.
-    output_partition->final_hdfs_file_name_prefix = Substitute("$0/$1/",
-        external_output_dir_, external_partition_name);
-  } else if (partition_descriptor.location().empty()) {
-    output_partition->final_hdfs_file_name_prefix = Substitute("$0/$1/",
-        table_desc_->hdfs_base_dir(), output_partition->partition_name);
-  } else {
-    // If the partition descriptor has a location (as set by alter table add 
partition
-    // with a location clause), that provides the complete directory path for 
this
-    // partition. No partition key suffix ("p=1/j=foo/") should be added.
-    output_partition->final_hdfs_file_name_prefix =
-        Substitute("$0/", partition_descriptor.location());
-  }
-  if (IsHiveAcid()) {
-    if (HasExternalOutputDir()) {
-      // The 0 padding on base and delta is to match the behavior of Hive 
since various
-      // systems will expect a certain format for dynamic partition creation. 
Additionally
-      // include an 0 statement id for delta directory so various Hive 
AcidUtils detect
-      // the directory (such as AcidUtils.baseOrDeltaSubdir()). Multiple 
statements in a
-      // single transaction is not supported.
-      if (overwrite_) {
-        output_partition->final_hdfs_file_name_prefix += 
StringPrintf("/base_%07ld/",
-            write_id_);
-      } else {
-        output_partition->final_hdfs_file_name_prefix += StringPrintf(
-            "/delta_%07ld_%07ld_0000/", write_id_, write_id_);
-      }
-    } else {
-      string acid_dir = Substitute(overwrite_ ? "/base_$0/" : "/delta_$0_$0/", 
write_id_);
-      output_partition->final_hdfs_file_name_prefix += acid_dir;
-    }
-  }
-  if (IsIceberg()) {
-    //TODO: implement LocationProviders.
-    if (output_partition->partition_name.empty()) {
-      output_partition->final_hdfs_file_name_prefix =
-          Substitute("$0/data/", table_desc_->IcebergTableLocation());
-    } else {
-      output_partition->final_hdfs_file_name_prefix =
-          Substitute("$0/data/$1/", table_desc_->IcebergTableLocation(),
-              output_partition->partition_name);
-    }
-  }
-  output_partition->final_hdfs_file_name_prefix += query_suffix;
-
-  output_partition->num_files = 0;
-}
-
-Status HdfsTableSink::WriteRowsToPartition(
-    RuntimeState* state, RowBatch* batch, PartitionPair* partition_pair) {
-  // The rows of this batch may span multiple files. We repeatedly pass the 
row batch to
-  // the writer until it sets new_file to false, indicating that all rows have 
been
-  // written. The writer tracks where it is in the batch when it returns with 
new_file
-  // set.
-  bool new_file;
-  while (true) {
-    OutputPartition* output_partition = partition_pair->first.get();
-    Status status =
-        output_partition->writer->AppendRows(batch, partition_pair->second, 
&new_file);
-    if (!status.ok()) {
-      // IMPALA-10607: Deletes partition file if staging is skipped when 
appending rows
-      // fails. Otherwise, it leaves the file in un-finalized state.
-      if (ShouldSkipStaging(state, output_partition)) {
-        status.MergeStatus(ClosePartitionFile(state, output_partition));
-        hdfsDelete(output_partition->hdfs_connection,
-            output_partition->current_file_name.c_str(), 0);
-      }
-      return status;
-    }
-    if (!new_file) break;
-    RETURN_IF_ERROR(FinalizePartitionFile(state, output_partition));
-    RETURN_IF_ERROR(CreateNewTmpFile(state, output_partition));
-  }
-  partition_pair->second.clear();
-  return Status::OK();
-}
-
 Status HdfsTableSink::WriteClusteredRowBatch(RuntimeState* state, RowBatch* 
batch) {
   DCHECK_GT(batch->num_rows(), 0);
   DCHECK(!dynamic_partition_key_expr_evals_.empty());
@@ -400,104 +282,6 @@ Status 
HdfsTableSink::WriteClusteredRowBatch(RuntimeState* state, RowBatch* batc
   return Status::OK();
 }
 
-Status HdfsTableSink::CreateNewTmpFile(RuntimeState* state,
-    OutputPartition* output_partition) {
-  SCOPED_TIMER(ADD_TIMER(profile(), "TmpFileCreateTimer"));
-  string file_name_pattern =
-      output_partition->writer->file_extension().empty() ? "$0.$1" : 
"$0.$1.$2";
-  string final_location = Substitute(file_name_pattern,
-      output_partition->final_hdfs_file_name_prefix, 
output_partition->num_files,
-      output_partition->writer->file_extension());
-
-  // If ShouldSkipStaging() is true, then the table sink will write the 
file(s) for this
-  // partition to the final location directly. If it is false, the file(s) 
will be written
-  // to a temporary staging location which will be moved by the coordinator to 
the final
-  // location.
-  if (ShouldSkipStaging(state, output_partition)) {
-    output_partition->current_file_name = final_location;
-    output_partition->current_file_final_name = "";
-  } else {
-    output_partition->current_file_name = Substitute(file_name_pattern,
-        output_partition->tmp_hdfs_file_name_prefix, 
output_partition->num_files,
-        output_partition->writer->file_extension());
-    // Save the ultimate destination for this file (it will be moved by the 
coordinator).
-    output_partition->current_file_final_name = final_location;
-  }
-  // Check if tmp_hdfs_file_name exists.
-  const char* tmp_hdfs_file_name_cstr =
-      output_partition->current_file_name.c_str();
-
-  if (hdfsExists(output_partition->hdfs_connection, tmp_hdfs_file_name_cstr) 
== 0) {
-    return Status(GetHdfsErrorMsg("Temporary HDFS file already exists: ",
-        output_partition->current_file_name));
-  }
-
-  // This is the block size from the HDFS partition metadata.
-  uint64_t block_size = output_partition->partition_descriptor->block_size();
-  // hdfsOpenFile takes a 4 byte integer as the block size.
-  if (block_size > numeric_limits<int32_t>::max()) {
-    return Status(Substitute("HDFS block size must be smaller than 2GB but is 
configured "
-        "in the HDFS partition to $0.", block_size));
-  }
-
-  if (block_size == 0) block_size = 
output_partition->writer->default_block_size();
-  if (block_size > numeric_limits<int32_t>::max()) {
-    return Status(Substitute("HDFS block size must be smaller than 2GB but the 
target "
-        "table requires $0.", block_size));
-  }
-
-  DCHECK_LE(block_size, numeric_limits<int32_t>::max());
-  output_partition->tmp_hdfs_file = 
hdfsOpenFile(output_partition->hdfs_connection,
-      tmp_hdfs_file_name_cstr, O_WRONLY, 0, 0, block_size);
-
-  VLOG_FILE << "hdfsOpenFile() file=" << tmp_hdfs_file_name_cstr;
-  if (output_partition->tmp_hdfs_file == nullptr) {
-    return Status(GetHdfsErrorMsg("Failed to open HDFS file for writing: ",
-        output_partition->current_file_name));
-  }
-
-  if (IsS3APath(tmp_hdfs_file_name_cstr) ||
-      IsABFSPath(tmp_hdfs_file_name_cstr) ||
-      IsADLSPath(tmp_hdfs_file_name_cstr) ||
-      IsOSSPath(tmp_hdfs_file_name_cstr) ||
-      IsGcsPath(tmp_hdfs_file_name_cstr) ||
-      IsCosPath(tmp_hdfs_file_name_cstr) ||
-      IsSFSPath(tmp_hdfs_file_name_cstr) ||
-      IsOzonePath(tmp_hdfs_file_name_cstr)) {
-    // On S3A, the file cannot be stat'ed until after it's closed, and even 
so, the block
-    // size reported will be just the filesystem default. Similarly, the block 
size
-    // reported for ADLS will be the filesystem default. So, remember the 
requested block
-    // size.
-    // TODO: IMPALA-9437: Ozone does not support stat'ing a file until after 
it's closed,
-    // so for now skip the call to hdfsGetPathInfo.
-    output_partition->block_size = block_size;
-  } else {
-    // HDFS may choose to override the block size that we've recommended, so 
for non-S3
-    // files, we get the block size by stat-ing the file.
-    hdfsFileInfo* info = hdfsGetPathInfo(output_partition->hdfs_connection,
-        output_partition->current_file_name.c_str());
-    if (info == nullptr) {
-      return Status(GetHdfsErrorMsg("Failed to get info on temporary HDFS 
file: ",
-          output_partition->current_file_name));
-    }
-    output_partition->block_size = info->mBlockSize;
-    hdfsFreeFileInfo(info, 1);
-  }
-
-  ImpaladMetrics::NUM_FILES_OPEN_FOR_INSERT->Increment(1);
-  COUNTER_ADD(files_created_counter_, 1);
-
-  ++output_partition->num_files;
-  output_partition->current_file_rows = 0;
-  Status status = output_partition->writer->InitNewFile();
-  if (!status.ok()) {
-    status.MergeStatus(ClosePartitionFile(state, output_partition));
-    hdfsDelete(output_partition->hdfs_connection,
-        output_partition->current_file_name.c_str(), 0);
-  }
-  return status;
-}
-
 string HdfsTableSink::GetPartitionName(int i) {
   if (IsIceberg()) {
     DCHECK_LT(i, partition_key_expr_evals_.size());
@@ -741,40 +525,6 @@ Status HdfsTableSink::Send(RuntimeState* state, RowBatch* 
batch) {
   return Status::OK();
 }
 
-Status HdfsTableSink::FinalizePartitionFile(
-    RuntimeState* state, OutputPartition* partition) {
-  if (partition->tmp_hdfs_file == nullptr && !overwrite_) return Status::OK();
-  SCOPED_TIMER(ADD_TIMER(profile(), "FinalizePartitionFileTimer"));
-
-  // OutputPartition writer could be nullptr if there is no row to output.
-  if (partition->writer.get() != nullptr) {
-    RETURN_IF_ERROR(partition->writer->Finalize());
-    state->dml_exec_state()->UpdatePartition(
-        partition->partition_name, partition->current_file_rows,
-        &partition->writer->stats());
-    state->dml_exec_state()->AddCreatedFile(*partition, IsIceberg(),
-        partition->writer->iceberg_file_stats());
-  }
-
-  RETURN_IF_ERROR(ClosePartitionFile(state, partition));
-  return Status::OK();
-}
-
-Status HdfsTableSink::ClosePartitionFile(
-    RuntimeState* state, OutputPartition* partition) {
-  if (partition->tmp_hdfs_file == nullptr) return Status::OK();
-  int hdfs_ret = hdfsCloseFile(partition->hdfs_connection, 
partition->tmp_hdfs_file);
-  VLOG_FILE << "hdfsCloseFile() file=" << partition->current_file_name;
-  partition->tmp_hdfs_file = nullptr;
-  ImpaladMetrics::NUM_FILES_OPEN_FOR_INSERT->Increment(-1);
-  if (hdfs_ret != 0) {
-    return Status(ErrorMsg(TErrorCode::GENERAL,
-        GetHdfsErrorMsg("Failed to close HDFS file: ",
-        partition->current_file_name)));
-  }
-  return Status::OK();
-}
-
 Status HdfsTableSink::FlushFinal(RuntimeState* state) {
   DCHECK(!closed_);
   SCOPED_TIMER(profile()->total_time_counter());
@@ -812,17 +562,10 @@ void HdfsTableSink::Close(RuntimeState* state) {
   }
   partition_keys_to_output_partitions_.clear();
   ScalarExprEvaluator::Close(partition_key_expr_evals_, state);
-  DataSink::Close(state);
+  TableSinkBase::Close(state);
   closed_ = true;
 }
 
-bool HdfsTableSink::ShouldSkipStaging(RuntimeState* state, OutputPartition* 
partition) {
-  if (IsTransactional() || HasExternalOutputDir() || is_result_sink_) return 
true;
-  // We skip staging if we are writing query results
-  return (IsS3APath(partition->final_hdfs_file_name_prefix.c_str()) && 
!overwrite_ &&
-      state->query_options().s3_skip_insert_staging);
-}
-
 string HdfsTableSink::DebugString() const {
   stringstream out;
   out << "HdfsTableSink(overwrite=" << (overwrite_ ? "true" : "false")
diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h
index 5aa637949..95cdc0010 100644
--- a/be/src/exec/hdfs-table-sink.h
+++ b/be/src/exec/hdfs-table-sink.h
@@ -22,8 +22,8 @@
 #include <hdfs.h>
 #include <boost/unordered_map.hpp>
 
-#include "exec/data-sink.h"
 #include "exec/output-partition.h"
+#include "exec/table-sink-base.h"
 #include "runtime/descriptors.h"
 
 namespace impala {
@@ -87,45 +87,44 @@ class HdfsTableSinkConfig : public DataSinkConfig {
 /// a commit for the ACID transaction.
 /// The name of the output directory will be
 /// <table base dir>/<partition dirs>/<ACID base or delta directory>
-class HdfsTableSink : public DataSink {
+class HdfsTableSink : public TableSinkBase {
  public:
   HdfsTableSink(TDataSinkId sink_id, const HdfsTableSinkConfig& sink_config,
     const THdfsTableSink& hdfs_sink, RuntimeState* state);
 
   /// Prepares output_exprs and partition_key_exprs, and connects to HDFS.
-  virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker);
+  Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) override;
 
   /// Opens output_exprs and partition_key_exprs, prepares the single output 
partition for
   /// static inserts, and populates partition_descriptor_map_.
-  virtual Status Open(RuntimeState* state);
+  Status Open(RuntimeState* state) override;
 
   /// Append all rows in batch to the temporary Hdfs files corresponding to 
partitions.
-  virtual Status Send(RuntimeState* state, RowBatch* batch);
+  Status Send(RuntimeState* state, RowBatch* batch) override;
 
   /// Finalize any open files.
   /// TODO: IMPALA-2988: Move calls to functions that can fail in Close() to 
FlushFinal()
-  virtual Status FlushFinal(RuntimeState* state);
+  Status FlushFinal(RuntimeState* state) override;
 
   /// Closes writers, output_exprs and partition_key_exprs and releases 
resources.
   /// The temporary files will be moved to their final destination by the 
Coordinator.
-  virtual void Close(RuntimeState* state);
+  void Close(RuntimeState* state) override;
 
-  int skip_header_line_count() const { return skip_header_line_count_; }
-  const vector<int32_t>& sort_columns() const { return sort_columns_; }
-  TSortingOrder::type sorting_order() const { return sorting_order_; }
+  const vector<int32_t>& sort_columns() const override { return sort_columns_; 
}
+  TSortingOrder::type sorting_order() const override { return sorting_order_; }
   const HdfsTableDescriptor& TableDesc() { return *table_desc_; }
 
-  const std::map<string, int64_t>& GetParquetBloomFilterColumns() const {
+  const std::map<string, int64_t>& GetParquetBloomFilterColumns() const 
override {
     return parquet_bloom_filter_columns_;
   }
 
-  RuntimeProfile::Counter* rows_inserted_counter() { return 
rows_inserted_counter_; }
-  RuntimeProfile::Counter* bytes_written_counter() { return 
bytes_written_counter_; }
-  RuntimeProfile::Counter* encode_timer() { return encode_timer_; }
-  RuntimeProfile::Counter* hdfs_write_timer() { return hdfs_write_timer_; }
-  RuntimeProfile::Counter* compress_timer() { return compress_timer_; }
+  bool is_overwrite() const override { return overwrite_; }
+  bool is_result_sink() const override { return is_result_sink_; }
+  std::string staging_dir() const override { return staging_dir_; }
+  int skip_header_line_count() const override { return 
skip_header_line_count_; }
+  int64_t write_id() const override { return write_id_; }
 
-  std::string DebugString() const;
+  std::string DebugString() const override;
 
  private:
   /// Build a map from partition key values to partition descriptor for 
multiple output
@@ -154,25 +153,6 @@ class HdfsTableSink : public DataSink {
       std::vector<std::string>* raw_partition_names,
       string* external_partition_name);
 
-  /// Add a temporary file to an output partition.  Files are created in a
-  /// temporary directory and then moved to the real partition directory by the
-  /// coordinator in a finalization step. The temporary file's current location
-  /// and final destination are recorded in the state parameter.
-  /// If this function fails, the tmp file is cleaned up.
-  Status CreateNewTmpFile(RuntimeState* state, OutputPartition* 
output_partition)
-      WARN_UNUSED_RESULT;
-
-  /// Key is the concatenation of the evaluated dynamic_partition_key_exprs_ 
generated by
-  /// GetHashTblKey(). Maps to an OutputPartition and a vector of indices of 
the rows in
-  /// the current batch to insert into the partition. The PartitionPair owns 
the
-  /// OutputPartition via a unique_ptr so that the memory is freed as soon as 
the
-  /// PartitionPair is removed from the map. This is important, because the
-  /// PartitionPairs can have different lifetimes. For example, a clustered 
insert into a
-  /// partitioned table iterates over the partitions, so only one 
PartitionPairs is
-  /// in the map at any given time.
-  typedef std::pair<std::unique_ptr<OutputPartition>, std::vector<int32_t>> 
PartitionPair;
-  typedef boost::unordered_map<std::string, PartitionPair> PartitionMap;
-
   /// Generates string key for hash_tbl_ as a concatenation of all evaluated 
exprs,
   /// evaluated against 'row'. The generated string is much shorter than the 
full Hdfs
   /// file name.
@@ -189,65 +169,20 @@ class HdfsTableSink : public DataSink {
       const std::string& key, PartitionPair** partition_pair, bool 
no_more_rows)
       WARN_UNUSED_RESULT;
 
-  /// Sets hdfs_file_name and tmp_hdfs_file_name of given output partition.
-  /// The Hdfs directory is created from the target table's base Hdfs dir,
-  /// the partition_key_names_ and the evaluated partition_key_exprs_.
-  /// The Hdfs file name is the unique_id_str_.
-  void BuildHdfsFileNames(const HdfsPartitionDescriptor& partition_descriptor,
-      OutputPartition* output, const std::string &external_partition_path);
-
-  /// Writes all rows referenced by the row index vector in 'partition_pair' 
to the
-  /// partition's writer and clears the row index vector afterwards.
-  Status WriteRowsToPartition(
-      RuntimeState* state, RowBatch* batch, PartitionPair* partition_pair)
-      WARN_UNUSED_RESULT;
-
   /// Maps all rows in 'batch' to partitions and appends them to their 
temporary Hdfs
   /// files. The input must be ordered by the partition key expressions.
   Status WriteClusteredRowBatch(RuntimeState* state, RowBatch* batch) 
WARN_UNUSED_RESULT;
 
-  /// Updates runtime stats of HDFS with rows written, then closes the file 
associated
-  /// with the partition by calling ClosePartitionFile()
-  Status FinalizePartitionFile(RuntimeState* state, OutputPartition* partition)
-      WARN_UNUSED_RESULT;
-
-  /// Closes the hdfs file for this partition as well as the writer.
-  Status ClosePartitionFile(RuntimeState* state, OutputPartition* partition)
-      WARN_UNUSED_RESULT;
-
   /// Returns the ith partition name of the table.
   std::string GetPartitionName(int i);
 
-  // Returns TRUE if the staging step should be skipped for this partition. 
This allows
-  // for faster INSERT query completion time for the S3A filesystem as the 
coordinator
-  // does not have to copy the file(s) from the staging locaiton to the final 
location. We
-  // do not skip for INSERT OVERWRITEs because the coordinator will delete all 
files in
-  // the final location before moving the staged files there, so we cannot 
write directly
-  // to the final location and need to write to the temporary staging location.
-  bool ShouldSkipStaging(RuntimeState* state, OutputPartition* partition);
-
-  /// Returns TRUE if the target table is transactional.
-  bool IsTransactional() const { return IsHiveAcid() || IsIceberg(); }
-
   /// Returns TRUE for Hive ACID tables.
-  bool IsHiveAcid() const { return write_id_ != -1; }
-
-  /// Returns TRUE for Iceberg tables.
-  bool IsIceberg() const { return table_desc_->IsIcebergTable(); }
-
-  /// Returns TRUE if an external output directory was provided.
-  bool HasExternalOutputDir() { return !external_output_dir_.empty(); }
-
-  /// Descriptor of target table. Set in Prepare().
-  const HdfsTableDescriptor* table_desc_;
+  bool IsHiveAcid() const override { return write_id_ != -1; }
 
   /// The partition descriptor used when creating new partitions from this 
sink.
   /// Currently we don't support multi-format sinks.
   const HdfsPartitionDescriptor* prototype_partition_;
 
-  /// Table id resolved in Prepare() to set tuple_desc_;
-  TableId table_id_;
-
   /// The 'skip.header.line.count' property of the target Hdfs table. We will 
insert this
   /// many empty lines at the beginning of new text files, which will be 
skipped by the
   /// scanners while reading from the files.
@@ -284,9 +219,6 @@ class HdfsTableSink : public DataSink {
   /// <hdfs_table_base_dir>/_impala_insert_staging/ during Prepare()
   std::string staging_dir_;
 
-  /// The directory in which an external FE expects results to be written to.
-  std::string external_output_dir_;
-
   /// How deep into the partition specification in which to start creating 
partition
   // directories. Used in conjunction with external_output_dir_ to inform the 
table
   // sink which directories are pre-created.
@@ -296,10 +228,6 @@ class HdfsTableSink : public DataSink {
   /// Parquet Bloom filtering is not enabled are not listed.
   std::map<std::string, int64_t> parquet_bloom_filter_columns_;
 
-  /// string representation of the unique fragment instance id. Used for 
per-partition
-  /// Hdfs file names, and for tmp Hdfs directories. Set in Prepare();
-  std::string unique_id_str_;
-
   /// Hash table of generated output partitions.
   /// Maps from a string representation of the dynamic_partition_key_exprs_
   /// generated by GetHashTblKey() to its corresponding OutputPartition.
@@ -322,18 +250,6 @@ class HdfsTableSink : public DataSink {
   typedef boost::unordered_map<std::string, HdfsPartitionDescriptor*>
       PartitionDescriptorMap;
   PartitionDescriptorMap partition_descriptor_map_;
-
-  RuntimeProfile::Counter* partitions_created_counter_;
-  RuntimeProfile::Counter* files_created_counter_;
-  RuntimeProfile::Counter* rows_inserted_counter_;
-  RuntimeProfile::Counter* bytes_written_counter_;
-
-  /// Time spent converting tuple to on disk format.
-  RuntimeProfile::Counter* encode_timer_;
-  /// Time spent writing to hdfs
-  RuntimeProfile::Counter* hdfs_write_timer_;
-  /// Time spent compressing data
-  RuntimeProfile::Counter* compress_timer_;
   /// Will the output of this sink be used for query results
   const bool is_result_sink_;
 };
diff --git a/be/src/exec/hdfs-table-writer.cc b/be/src/exec/hdfs-table-writer.cc
index af4968420..c710ac6f0 100644
--- a/be/src/exec/hdfs-table-writer.cc
+++ b/be/src/exec/hdfs-table-writer.cc
@@ -26,7 +26,7 @@
 
 namespace impala {
 
-HdfsTableWriter::HdfsTableWriter(HdfsTableSink* parent,
+HdfsTableWriter::HdfsTableWriter(TableSinkBase* parent,
     RuntimeState* state, OutputPartition* output,
     const HdfsPartitionDescriptor* partition_desc, const HdfsTableDescriptor* 
table_desc)
   : parent_(parent),
diff --git a/be/src/exec/hdfs-table-writer.h b/be/src/exec/hdfs-table-writer.h
index 764f30e72..ae4d56589 100644
--- a/be/src/exec/hdfs-table-writer.h
+++ b/be/src/exec/hdfs-table-writer.h
@@ -29,7 +29,7 @@ namespace impala {
 
 class HdfsPartitionDescriptor;
 class HdfsTableDescriptor;
-class HdfsTableSink;
+class TableSinkBase;
 struct OutputPartition;
 class RowBatch;
 class RuntimeState;
@@ -61,7 +61,7 @@ class HdfsTableWriter {
   /// output_partition -- Information on the output partition file.
   /// partition -- the descriptor for the partition being written
   /// table_desc -- the descriptor for the table being written.
-  HdfsTableWriter(HdfsTableSink* parent,
+  HdfsTableWriter(TableSinkBase* parent,
                   RuntimeState* state, OutputPartition* output_partition,
                   const HdfsPartitionDescriptor* partition_desc,
                   const HdfsTableDescriptor* table_desc);
@@ -132,7 +132,7 @@ class HdfsTableWriter {
   }
 
   /// Parent table sink object
-  HdfsTableSink* parent_;
+  TableSinkBase* parent_;
 
   /// Runtime state.
   RuntimeState* state_;
diff --git a/be/src/exec/iceberg-delete-sink.cc 
b/be/src/exec/iceberg-delete-sink.cc
new file mode 100644
index 000000000..ccbef01f3
--- /dev/null
+++ b/be/src/exec/iceberg-delete-sink.cc
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/iceberg-delete-sink.h"
+
+#include "common/object-pool.h"
+#include "exec/parquet/hdfs-parquet-table-writer.h"
+#include "exprs/scalar-expr.h"
+#include "runtime/descriptors.h"
+#include "runtime/hdfs-fs-cache.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/row-batch.h"
+#include "runtime/runtime-state.h"
+#include "util/debug-util.h"
+#include "util/hdfs-util.h"
+#include "util/impalad-metrics.h"
+#include "util/metrics.h"
+#include "util/runtime-profile-counters.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+IcebergDeleteSink::IcebergDeleteSink(TDataSinkId sink_id,
+    const IcebergDeleteSinkConfig& sink_config, const TIcebergDeleteSink& 
ice_del_sink,
+    RuntimeState* state) : TableSinkBase(sink_id, sink_config,
+    "IcebergDeleteSink", state) {
+}
+
+DataSink* IcebergDeleteSinkConfig::CreateSink(RuntimeState* state) const {
+  TDataSinkId sink_id = state->fragment().idx;
+  return state->obj_pool()->Add(
+      new IcebergDeleteSink(sink_id, *this,
+          this->tsink_->table_sink.iceberg_delete_sink, state));
+}
+
+void IcebergDeleteSinkConfig::Close() {
+  DataSinkConfig::Close();
+}
+
+Status IcebergDeleteSinkConfig::Init(
+    const TDataSink& tsink, const RowDescriptor* input_row_desc, 
FragmentState* state) {
+  RETURN_IF_ERROR(DataSinkConfig::Init(tsink, input_row_desc, state));
+  DCHECK(tsink_->__isset.table_sink);
+  return Status::OK();
+}
+
+Status IcebergDeleteSink::Prepare(RuntimeState* state, MemTracker* 
parent_mem_tracker) {
+  SCOPED_TIMER(profile()->total_time_counter());
+  RETURN_IF_ERROR(TableSinkBase::Prepare(state, parent_mem_tracker));
+  unique_id_str_ = "delete-" + PrintId(state->fragment_instance_id(), "-");
+
+  // Resolve table id and set input tuple descriptor.
+  table_desc_ = static_cast<const HdfsTableDescriptor*>(
+      state->desc_tbl().GetTableDescriptor(table_id_));
+  if (table_desc_ == nullptr) {
+    stringstream error_msg("Failed to get table descriptor for table id: ");
+    error_msg << table_id_;
+    return Status(error_msg.str());
+  }
+
+  DCHECK_GE(output_expr_evals_.size(),
+      table_desc_->num_cols() - table_desc_->num_clustering_cols()) << 
DebugString();
+
+  return Status::OK();
+}
+
+Status IcebergDeleteSink::Open(RuntimeState* state) {
+  SCOPED_TIMER(profile()->total_time_counter());
+  RETURN_IF_ERROR(TableSinkBase::Open(state));
+  prototype_partition_ = 
CHECK_NOTNULL(table_desc_->prototype_partition_descriptor());
+  output_partition_ = make_pair(make_unique<OutputPartition>(), 
std::vector<int32_t>());
+  state->dml_exec_state()->AddPartition(
+      output_partition_.first->partition_name, prototype_partition_->id(),
+      &table_desc_->hdfs_base_dir(), nullptr);
+  return Status::OK();
+}
+
+Status IcebergDeleteSink::Send(RuntimeState* state, RowBatch* batch) {
+  SCOPED_TIMER(profile()->total_time_counter());
+  RETURN_IF_ERROR(state->CheckQueryState());
+  // We don't do any work for an empty batch.
+  if (batch->num_rows() == 0) return Status::OK();
+
+  if (output_partition_.first->writer == nullptr) {
+    RETURN_IF_ERROR(InitOutputPartition(state));
+  }
+
+  RETURN_IF_ERROR(WriteRowsToPartition(state, batch, &output_partition_));
+
+  return Status();
+}
+
+Status IcebergDeleteSink::InitOutputPartition(RuntimeState* state) {
+  // Build the unique name for this partition from the partition keys, e.g. 
"j=1/f=foo/"
+  // etc.
+  stringstream partition_name_ss;
+
+  // partition_name_ss now holds the unique descriptor for this partition,
+  output_partition_.first->partition_name = partition_name_ss.str();
+  BuildHdfsFileNames(*prototype_partition_, output_partition_.first.get(), "");
+
+  // We will be writing to the final file if we're skipping staging, so get a 
connection
+  // to its filesystem.
+  RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
+      output_partition_.first->final_hdfs_file_name_prefix,
+      &output_partition_.first->hdfs_connection));
+
+  output_partition_.first->partition_descriptor = prototype_partition_;
+
+  output_partition_.first->writer.reset(
+      new HdfsParquetTableWriter(
+          this, state, output_partition_.first.get(), prototype_partition_, 
table_desc_));
+  RETURN_IF_ERROR(output_partition_.first->writer->Init());
+  COUNTER_ADD(partitions_created_counter_, 1);
+  return CreateNewTmpFile(state, output_partition_.first.get());
+}
+
+Status IcebergDeleteSink::FlushFinal(RuntimeState* state) {
+  DCHECK(!closed_);
+  SCOPED_TIMER(profile()->total_time_counter());
+
+  RETURN_IF_ERROR(FinalizePartitionFile(state, output_partition_.first.get()));
+  return Status::OK();
+}
+
+void IcebergDeleteSink::Close(RuntimeState* state) {
+  if (closed_) return;
+  SCOPED_TIMER(profile()->total_time_counter());
+
+  if (output_partition_.first->writer != nullptr) {
+    output_partition_.first->writer->Close();
+  }
+  Status close_status = ClosePartitionFile(state, 
output_partition_.first.get());
+  if (!close_status.ok()) state->LogError(close_status.msg());
+
+  output_partition_.first.reset();
+  TableSinkBase::Close(state);
+  closed_ = true;
+}
+
+string IcebergDeleteSink::DebugString() const {
+  stringstream out;
+  out << "IcebergDeleteSink("
+      << " table_desc=" << table_desc_->DebugString()
+      << " output_exprs=" << ScalarExpr::DebugString(output_exprs_)
+      << ")";
+  return out.str();
+}
+
+} // namespace impala
diff --git a/be/src/exec/iceberg-delete-sink.h 
b/be/src/exec/iceberg-delete-sink.h
new file mode 100644
index 000000000..7e6b232aa
--- /dev/null
+++ b/be/src/exec/iceberg-delete-sink.h
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "exec/output-partition.h"
+#include "exec/table-sink-base.h"
+
+#include <unordered_map>
+
+namespace impala {
+
+class Expr;
+class TupleDescriptor;
+class TupleRow;
+class RuntimeState;
+class MemTracker;
+
+class IcebergDeleteSinkConfig : public DataSinkConfig {
+ public:
+  DataSink* CreateSink(RuntimeState* state) const override;
+  void Close() override;
+
+  /// Expressions for computing the target partitions to which a row is 
written.
+  std::vector<ScalarExpr*> partition_key_exprs_;
+
+  ~IcebergDeleteSinkConfig() override {}
+
+ protected:
+  Status Init(const TDataSink& tsink, const RowDescriptor* input_row_desc,
+      FragmentState* state) override;
+};
+
+class IcebergDeleteSink : public TableSinkBase {
+ public:
+  IcebergDeleteSink(TDataSinkId sink_id, const IcebergDeleteSinkConfig& 
sink_config,
+    const TIcebergDeleteSink& hdfs_sink, RuntimeState* state);
+
+  /// Prepares output_exprs and partition_key_exprs, and connects to HDFS.
+  Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) override;
+
+  /// Opens output_exprs and partition_key_exprs, prepares the single output 
partition for
+  /// static inserts, and populates partition_descriptor_map_.
+  Status Open(RuntimeState* state) override;
+
+  /// Append all rows in batch to the temporary Hdfs files corresponding to 
partitions.
+  Status Send(RuntimeState* state, RowBatch* batch) override;
+
+  /// Finalize any open files.
+  /// TODO: IMPALA-2988: Move calls to functions that can fail in Close() to 
FlushFinal()
+  Status FlushFinal(RuntimeState* state) override;
+
+  /// Closes writers, output_exprs and partition_key_exprs and releases 
resources.
+  /// The temporary files will be moved to their final destination by the 
Coordinator.
+  void Close(RuntimeState* state) override;
+
+  TSortingOrder::type sorting_order() const override { return 
TSortingOrder::LEXICAL; }
+
+  std::string DebugString() const override;
+
+ private:
+  /// Initialises the filenames of a given output partition, and opens the 
temporary file.
+  Status InitOutputPartition(RuntimeState* state) WARN_UNUSED_RESULT;
+
+  /// For now we only allow non-partitioned Iceberg tables.
+  PartitionPair output_partition_;
+
+  /// The partition descriptor used when creating new partitions from this 
sink.
+  /// Currently we don't support multi-format sinks.
+  const HdfsPartitionDescriptor* prototype_partition_;
+};
+
+}
+
+
diff --git a/be/src/exec/parquet/hdfs-parquet-table-writer.cc 
b/be/src/exec/parquet/hdfs-parquet-table-writer.cc
index a5f604499..07a33df66 100644
--- a/be/src/exec/parquet/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/parquet/hdfs-parquet-table-writer.cc
@@ -136,8 +136,9 @@ class HdfsParquetTableWriter::BaseColumnWriter {
       row_group_stats_base_(nullptr),
       table_sink_mem_tracker_(parent_->parent_->mem_tracker()),
       column_name_(std::move(column_name)) {
-    static_assert(std::is_same<decltype(parent_->parent_), 
HdfsTableSink*>::value,
-        "'table_sink_mem_tracker_' must point to the mem tracker of an 
HdfsTableSink");
+    static_assert(std::is_base_of_v<TableSinkBase,
+                                    
std::remove_reference_t<decltype(*parent_->parent_)>>,
+        "'table_sink_mem_tracker_' must point to the mem tracker of a 
TableSinkBase");
     def_levels_ = parent_->state_->obj_pool()->Add(
         new 
RleEncoder(parent_->reusable_col_mem_pool_->Allocate(DEFAULT_DATA_PAGE_SIZE),
                        DEFAULT_DATA_PAGE_SIZE, 1));
@@ -1231,7 +1232,8 @@ void HdfsParquetTableWriter::BaseColumnWriter::NewPage() {
   page_stats_base_->Reset();
 }
 
-HdfsParquetTableWriter::HdfsParquetTableWriter(HdfsTableSink* parent, 
RuntimeState* state,
+HdfsParquetTableWriter::
+HdfsParquetTableWriter(TableSinkBase* parent, RuntimeState* state,
     OutputPartition* output, const HdfsPartitionDescriptor* part_desc,
     const HdfsTableDescriptor* table_desc)
   : HdfsTableWriter(parent, state, output, part_desc, table_desc),
diff --git a/be/src/exec/parquet/hdfs-parquet-table-writer.h 
b/be/src/exec/parquet/hdfs-parquet-table-writer.h
index 862b9cbf0..c76dc2cef 100644
--- a/be/src/exec/parquet/hdfs-parquet-table-writer.h
+++ b/be/src/exec/parquet/hdfs-parquet-table-writer.h
@@ -19,7 +19,7 @@
 #ifndef IMPALA_EXEC_HDFS_PARQUET_TABLE_WRITER_H
 #define IMPALA_EXEC_HDFS_PARQUET_TABLE_WRITER_H
 
-#include "exec/data-sink.h"
+#include "exec/table-sink-base.h"
 
 #include <hdfs.h>
 #include <map>
@@ -53,7 +53,7 @@ class TupleRow;
 
 class HdfsParquetTableWriter : public HdfsTableWriter {
  public:
-  HdfsParquetTableWriter(HdfsTableSink* parent, RuntimeState* state,
+  HdfsParquetTableWriter(TableSinkBase* parent, RuntimeState* state,
       OutputPartition* output_partition, const HdfsPartitionDescriptor* 
part_desc,
       const HdfsTableDescriptor* table_desc);
 
diff --git a/be/src/exec/table-sink-base.cc b/be/src/exec/table-sink-base.cc
new file mode 100644
index 000000000..09551d0b1
--- /dev/null
+++ b/be/src/exec/table-sink-base.cc
@@ -0,0 +1,293 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/table-sink-base.h"
+
+#include "exec/output-partition.h"
+#include "runtime/runtime-state.h"
+#include "util/hdfs-util.h"
+#include "util/impalad-metrics.h"
+#include "util/metrics.h"
+#include "util/string-util.h"
+#include "util/runtime-profile-counters.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+Status TableSinkBase::Prepare(RuntimeState* state, MemTracker* 
parent_mem_tracker) {
+  RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
+  partitions_created_counter_ = ADD_COUNTER(profile(), "PartitionsCreated", 
TUnit::UNIT);
+  files_created_counter_ = ADD_COUNTER(profile(), "FilesCreated", TUnit::UNIT);
+  rows_inserted_counter_ = ADD_COUNTER(profile(), "RowsInserted", TUnit::UNIT);
+  bytes_written_counter_ = ADD_COUNTER(profile(), "BytesWritten", 
TUnit::BYTES);
+  encode_timer_ = ADD_TIMER(profile(), "EncodeTimer");
+  hdfs_write_timer_ = ADD_TIMER(profile(), "HdfsWriteTimer");
+  compress_timer_ = ADD_TIMER(profile(), "CompressTimer");
+  return Status::OK();
+}
+
+Status TableSinkBase::ClosePartitionFile(
+    RuntimeState* state, OutputPartition* partition) {
+  if (partition->tmp_hdfs_file == nullptr) return Status::OK();
+  int hdfs_ret = hdfsCloseFile(partition->hdfs_connection, 
partition->tmp_hdfs_file);
+  VLOG_FILE << "hdfsCloseFile() file=" << partition->current_file_name;
+  partition->tmp_hdfs_file = nullptr;
+  ImpaladMetrics::NUM_FILES_OPEN_FOR_INSERT->Increment(-1);
+  if (hdfs_ret != 0) {
+    return Status(ErrorMsg(TErrorCode::GENERAL,
+        GetHdfsErrorMsg("Failed to close HDFS file: ",
+        partition->current_file_name)));
+  }
+  return Status::OK();
+}
+
+void TableSinkBase::BuildHdfsFileNames(
+    const HdfsPartitionDescriptor& partition_descriptor,
+    OutputPartition* output_partition, const string &external_partition_name) {
+
+  // Create final_hdfs_file_name_prefix and tmp_hdfs_file_name_prefix.
+  // Path: <hdfs_base_dir>/<partition_values>/<unique_id_str>
+  // Or, for transactional tables:
+  // Path: 
<hdfs_base_dir>/<partition_values>/<transaction_directory>/<unique_id_str>
+  // Where <transaction_directory> is either a 'base' or a 'delta' directory 
in Hive ACID
+  // terminology.
+
+  // Temporary files are written under the following path which is unique to 
this sink:
+  // 
<table_dir>/_impala_insert_staging/<query_id>/<per_fragment_unique_id>_dir/
+  // Both the temporary directory and the file name, when moved to the real 
partition
+  // directory must be unique.
+  // Prefix the directory name with "." to make it hidden and append "_dir" at 
the end
+  // of the directory to avoid name clashes for unpartitioned tables.
+  // The files are located in <partition_values>/<random_value>_data under
+  // tmp_hdfs_file_name_prefix.
+
+  // Use the query id as filename.
+  const string& query_suffix = Substitute("$0_$1_data", unique_id_str_, 
rand());
+
+  output_partition->tmp_hdfs_dir_name =
+      Substitute("$0/.$1_$2_dir/", staging_dir(), unique_id_str_, rand());
+  output_partition->tmp_hdfs_file_name_prefix = Substitute("$0$1/$2",
+      output_partition->tmp_hdfs_dir_name, output_partition->partition_name,
+      query_suffix);
+
+  if (HasExternalOutputDir()) {
+    // When an external FE has provided a staging directory we use that 
directly.
+    // We are trusting that the external frontend implementation has done 
appropriate
+    // authorization checks on the external output directory.
+    output_partition->final_hdfs_file_name_prefix = Substitute("$0/$1/",
+        external_output_dir_, external_partition_name);
+  } else if (partition_descriptor.location().empty()) {
+    output_partition->final_hdfs_file_name_prefix = Substitute("$0/$1/",
+        table_desc_->hdfs_base_dir(), output_partition->partition_name);
+  } else {
+    // If the partition descriptor has a location (as set by alter table add 
partition
+    // with a location clause), that provides the complete directory path for 
this
+    // partition. No partition key suffix ("p=1/j=foo/") should be added.
+    output_partition->final_hdfs_file_name_prefix =
+        Substitute("$0/", partition_descriptor.location());
+  }
+  if (IsHiveAcid()) {
+    if (HasExternalOutputDir()) {
+      // The 0 padding on base and delta is to match the behavior of Hive 
since various
+      // systems will expect a certain format for dynamic partition creation. 
Additionally
+      // include an 0 statement id for delta directory so various Hive 
AcidUtils detect
+      // the directory (such as AcidUtils.baseOrDeltaSubdir()). Multiple 
statements in a
+      // single transaction is not supported.
+      if (is_overwrite()) {
+        output_partition->final_hdfs_file_name_prefix += 
StringPrintf("/base_%07ld/",
+            write_id());
+      } else {
+        output_partition->final_hdfs_file_name_prefix += StringPrintf(
+            "/delta_%07ld_%07ld_0000/", write_id(), write_id());
+      }
+    } else {
+      string acid_dir = Substitute(
+          is_overwrite() ? "/base_$0/" : "/delta_$0_$0/", write_id());
+      output_partition->final_hdfs_file_name_prefix += acid_dir;
+    }
+  }
+  if (IsIceberg()) {
+    //TODO: implement LocationProviders.
+    if (output_partition->partition_name.empty()) {
+      output_partition->final_hdfs_file_name_prefix =
+          Substitute("$0/data/", table_desc_->IcebergTableLocation());
+    } else {
+      output_partition->final_hdfs_file_name_prefix =
+          Substitute("$0/data/$1/", table_desc_->IcebergTableLocation(),
+              output_partition->partition_name);
+    }
+  }
+  output_partition->final_hdfs_file_name_prefix += query_suffix;
+  output_partition->num_files = 0;
+}
+
+Status TableSinkBase::CreateNewTmpFile(RuntimeState* state,
+    OutputPartition* output_partition) {
+  SCOPED_TIMER(ADD_TIMER(profile(), "TmpFileCreateTimer"));
+  string file_name_pattern =
+      output_partition->writer->file_extension().empty() ? "$0.$1" : 
"$0.$1.$2";
+  string final_location = Substitute(file_name_pattern,
+      output_partition->final_hdfs_file_name_prefix, 
output_partition->num_files,
+      output_partition->writer->file_extension());
+
+  // If ShouldSkipStaging() is true, then the table sink will write the 
file(s) for this
+  // partition to the final location directly. If it is false, the file(s) 
will be written
+  // to a temporary staging location which will be moved by the coordinator to 
the final
+  // location.
+  if (ShouldSkipStaging(state, output_partition)) {
+    output_partition->current_file_name = final_location;
+    output_partition->current_file_final_name = "";
+  } else {
+    output_partition->current_file_name = Substitute(file_name_pattern,
+        output_partition->tmp_hdfs_file_name_prefix, 
output_partition->num_files,
+        output_partition->writer->file_extension());
+    // Save the ultimate destination for this file (it will be moved by the 
coordinator).
+    output_partition->current_file_final_name = final_location;
+  }
+  // Check if tmp_hdfs_file_name exists.
+  const char* tmp_hdfs_file_name_cstr =
+      output_partition->current_file_name.c_str();
+
+  if (hdfsExists(output_partition->hdfs_connection, tmp_hdfs_file_name_cstr) 
== 0) {
+    return Status(GetHdfsErrorMsg("Temporary HDFS file already exists: ",
+        output_partition->current_file_name));
+  }
+
+  // This is the block size from the HDFS partition metadata.
+  uint64_t block_size = output_partition->partition_descriptor->block_size();
+  // hdfsOpenFile takes a 4 byte integer as the block size.
+  if (block_size > numeric_limits<int32_t>::max()) {
+    return Status(Substitute("HDFS block size must be smaller than 2GB but is 
configured "
+        "in the HDFS partition to $0.", block_size));
+  }
+
+  if (block_size == 0) block_size = 
output_partition->writer->default_block_size();
+  if (block_size > numeric_limits<int32_t>::max()) {
+    return Status(Substitute("HDFS block size must be smaller than 2GB but the 
target "
+        "table requires $0.", block_size));
+  }
+
+  DCHECK_LE(block_size, numeric_limits<int32_t>::max());
+  output_partition->tmp_hdfs_file = 
hdfsOpenFile(output_partition->hdfs_connection,
+      tmp_hdfs_file_name_cstr, O_WRONLY, 0, 0, block_size);
+
+  VLOG_FILE << "hdfsOpenFile() file=" << tmp_hdfs_file_name_cstr;
+  if (output_partition->tmp_hdfs_file == nullptr) {
+    return Status(GetHdfsErrorMsg("Failed to open HDFS file for writing: ",
+        output_partition->current_file_name));
+  }
+
+  if (IsS3APath(tmp_hdfs_file_name_cstr) ||
+      IsABFSPath(tmp_hdfs_file_name_cstr) ||
+      IsADLSPath(tmp_hdfs_file_name_cstr) ||
+      IsOSSPath(tmp_hdfs_file_name_cstr) ||
+      IsGcsPath(tmp_hdfs_file_name_cstr) ||
+      IsCosPath(tmp_hdfs_file_name_cstr) ||
+      IsSFSPath(tmp_hdfs_file_name_cstr) ||
+      IsOzonePath(tmp_hdfs_file_name_cstr)) {
+    // On S3A, the file cannot be stat'ed until after it's closed, and even 
so, the block
+    // size reported will be just the filesystem default. Similarly, the block 
size
+    // reported for ADLS will be the filesystem default. So, remember the 
requested block
+    // size.
+    // TODO: IMPALA-9437: Ozone does not support stat'ing a file until after 
it's closed,
+    // so for now skip the call to hdfsGetPathInfo.
+    output_partition->block_size = block_size;
+  } else {
+    // HDFS may choose to override the block size that we've recommended, so 
for non-S3
+    // files, we get the block size by stat-ing the file.
+    hdfsFileInfo* info = hdfsGetPathInfo(output_partition->hdfs_connection,
+        output_partition->current_file_name.c_str());
+    if (info == nullptr) {
+      return Status(GetHdfsErrorMsg("Failed to get info on temporary HDFS 
file: ",
+          output_partition->current_file_name));
+    }
+    output_partition->block_size = info->mBlockSize;
+    hdfsFreeFileInfo(info, 1);
+  }
+
+  ImpaladMetrics::NUM_FILES_OPEN_FOR_INSERT->Increment(1);
+  COUNTER_ADD(files_created_counter_, 1);
+
+  ++output_partition->num_files;
+  output_partition->current_file_rows = 0;
+  Status status = output_partition->writer->InitNewFile();
+  if (!status.ok()) {
+    status.MergeStatus(ClosePartitionFile(state, output_partition));
+    hdfsDelete(output_partition->hdfs_connection,
+        output_partition->current_file_name.c_str(), 0);
+  }
+  return status;
+}
+
+Status TableSinkBase::WriteRowsToPartition(
+    RuntimeState* state, RowBatch* batch, PartitionPair* partition_pair) {
+  // The rows of this batch may span multiple files. We repeatedly pass the 
row batch to
+  // the writer until it sets new_file to false, indicating that all rows have 
been
+  // written. The writer tracks where it is in the batch when it returns with 
new_file
+  // set.
+  bool new_file;
+  while (true) {
+    OutputPartition* output_partition = partition_pair->first.get();
+    Status status =
+        output_partition->writer->AppendRows(batch, partition_pair->second, 
&new_file);
+    if (!status.ok()) {
+      // IMPALA-10607: Deletes partition file if staging is skipped when 
appending rows
+      // fails. Otherwise, it leaves the file in un-finalized state.
+      if (ShouldSkipStaging(state, output_partition)) {
+        status.MergeStatus(ClosePartitionFile(state, output_partition));
+        hdfsDelete(output_partition->hdfs_connection,
+            output_partition->current_file_name.c_str(), 0);
+      }
+      return status;
+    }
+    if (!new_file) break;
+    RETURN_IF_ERROR(FinalizePartitionFile(state, output_partition));
+    RETURN_IF_ERROR(CreateNewTmpFile(state, output_partition));
+  }
+  partition_pair->second.clear();
+  return Status::OK();
+}
+
+bool TableSinkBase::ShouldSkipStaging(RuntimeState* state, OutputPartition* 
partition) {
+  if (IsTransactional() || HasExternalOutputDir() || is_result_sink()) return 
true;
+  // We skip staging if we are writing query results
+  return (IsS3APath(partition->final_hdfs_file_name_prefix.c_str()) && 
!is_overwrite() &&
+      state->query_options().s3_skip_insert_staging);
+}
+
+Status TableSinkBase::FinalizePartitionFile(
+    RuntimeState* state, OutputPartition* partition) {
+  if (partition->tmp_hdfs_file == nullptr && !is_overwrite()) return 
Status::OK();
+  SCOPED_TIMER(ADD_TIMER(profile(), "FinalizePartitionFileTimer"));
+
+  // OutputPartition writer could be nullptr if there is no row to output.
+  if (partition->writer.get() != nullptr) {
+    RETURN_IF_ERROR(partition->writer->Finalize());
+    state->dml_exec_state()->UpdatePartition(
+        partition->partition_name, partition->current_file_rows,
+        &partition->writer->stats());
+    state->dml_exec_state()->AddCreatedFile(*partition, IsIceberg(),
+        partition->writer->iceberg_file_stats());
+  }
+
+  RETURN_IF_ERROR(ClosePartitionFile(state, partition));
+  return Status::OK();
+}
+
+}
+
diff --git a/be/src/exec/table-sink-base.h b/be/src/exec/table-sink-base.h
new file mode 100644
index 000000000..b82f74d35
--- /dev/null
+++ b/be/src/exec/table-sink-base.h
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "exec/data-sink.h"
+
+#include "runtime/descriptors.h"
+
+namespace impala {
+
+class TableSinkBase : public DataSink {
+public:
+  TableSinkBase(TDataSinkId sink_id, const DataSinkConfig& sink_config,
+      const std::string& name, RuntimeState* state) :
+      DataSink(sink_id, sink_config, name, state),
+      table_id_(sink_config.tsink_->table_sink.target_table_id) {}
+
+  virtual bool is_overwrite() const { return false; }
+  virtual bool is_result_sink() const { return false; }
+  virtual int64_t write_id() const { return -1; }
+  virtual std::string staging_dir() const { return ""; }
+  virtual int skip_header_line_count() const { return 0; }
+  virtual TSortingOrder::type sorting_order() const = 0;
+  virtual const vector<int32_t>& sort_columns() const {
+    static vector<int32_t> dummy;
+    return dummy;
+  }
+  virtual const std::map<string, int64_t>& GetParquetBloomFilterColumns() 
const {
+    static std::map<string, int64_t> dummy;
+    return dummy;
+  }
+
+  Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker);
+
+  RuntimeProfile::Counter* rows_inserted_counter() { return 
rows_inserted_counter_; }
+  RuntimeProfile::Counter* bytes_written_counter() { return 
bytes_written_counter_; }
+  RuntimeProfile::Counter* encode_timer() { return encode_timer_; }
+  RuntimeProfile::Counter* hdfs_write_timer() { return hdfs_write_timer_; }
+  RuntimeProfile::Counter* compress_timer() { return compress_timer_; }
+
+  virtual std::string DebugString() const = 0;
+protected:
+  /// Key is the concatenation of the evaluated dynamic_partition_key_exprs_ 
generated by
+  /// GetHashTblKey(). Maps to an OutputPartition and a vector of indices of 
the rows in
+  /// the current batch to insert into the partition. The PartitionPair owns 
the
+  /// OutputPartition via a unique_ptr so that the memory is freed as soon as 
the
+  /// PartitionPair is removed from the map. This is important, because the
+  /// PartitionPairs can have different lifetimes. For example, a clustered 
insert into a
+  /// partitioned table iterates over the partitions, so only one 
PartitionPairs is
+  /// in the map at any given time.
+  typedef std::pair<std::unique_ptr<OutputPartition>, std::vector<int32_t>> 
PartitionPair;
+  typedef boost::unordered_map<std::string, PartitionPair> PartitionMap;
+
+  /// Returns TRUE if the target table is transactional.
+  bool IsTransactional() const { return IsHiveAcid() || IsIceberg(); }
+
+  virtual bool IsHiveAcid() const { return false; }
+
+  /// Sets hdfs_file_name and tmp_hdfs_file_name of given output partition.
+  /// The Hdfs directory is created from the target table's base Hdfs dir,
+  /// the partition_key_names_ and the evaluated partition_key_exprs_.
+  /// The Hdfs file name is the unique_id_str_.
+  void BuildHdfsFileNames(const HdfsPartitionDescriptor& partition_descriptor,
+      OutputPartition* output, const std::string &external_partition_path);
+
+  /// Add a temporary file to an output partition.  Files are created in a
+  /// temporary directory and then moved to the real partition directory by the
+  /// coordinator in a finalization step. The temporary file's current location
+  /// and final destination are recorded in the state parameter.
+  /// If this function fails, the tmp file is cleaned up.
+  Status CreateNewTmpFile(RuntimeState* state, OutputPartition* 
output_partition)
+      WARN_UNUSED_RESULT;
+
+  /// Updates runtime stats of HDFS with rows written, then closes the file 
associated
+  /// with the partition by calling ClosePartitionFile()
+  Status FinalizePartitionFile(RuntimeState* state, OutputPartition* partition)
+      WARN_UNUSED_RESULT;
+
+  /// Writes all rows referenced by the row index vector in 'partition_pair' 
to the
+  /// partition's writer and clears the row index vector afterwards.
+  Status WriteRowsToPartition(
+      RuntimeState* state, RowBatch* batch, PartitionPair* partition_pair)
+      WARN_UNUSED_RESULT;
+
+  /// Closes the hdfs file for this partition as well as the writer.
+  Status ClosePartitionFile(RuntimeState* state, OutputPartition* partition)
+      WARN_UNUSED_RESULT;
+
+  // Returns TRUE if the staging step should be skipped for this partition. 
This allows
+  // for faster INSERT query completion time for the S3A filesystem as the 
coordinator
+  // does not have to copy the file(s) from the staging locaiton to the final 
location. We
+  // do not skip for INSERT OVERWRITEs because the coordinator will delete all 
files in
+  // the final location before moving the staged files there, so we cannot 
write directly
+  // to the final location and need to write to the temporary staging location.
+  bool ShouldSkipStaging(RuntimeState* state, OutputPartition* partition);
+
+  /// Returns TRUE for Iceberg tables.
+  bool IsIceberg() const { return table_desc_->IsIcebergTable(); }
+
+  /// Returns TRUE if an external output directory was provided.
+  bool HasExternalOutputDir() { return !external_output_dir_.empty(); }
+
+  /// Table id resolved in Prepare() to set tuple_desc_;
+  TableId table_id_;
+
+  /// string representation of the unique fragment instance id. Used for 
per-partition
+  /// Hdfs file names, and for tmp Hdfs directories. Set in Prepare();
+  std::string unique_id_str_;
+
+  /// Descriptor of target table. Set in Prepare().
+  const HdfsTableDescriptor* table_desc_ = nullptr;
+
+  /// The directory in which an external FE expects results to be written to.
+  std::string external_output_dir_;
+
+  RuntimeProfile::Counter* partitions_created_counter_;
+  RuntimeProfile::Counter* files_created_counter_;
+  RuntimeProfile::Counter* rows_inserted_counter_;
+  RuntimeProfile::Counter* bytes_written_counter_;
+
+  /// Time spent converting tuple to on disk format.
+  RuntimeProfile::Counter* encode_timer_;
+  /// Time spent writing to hdfs
+  RuntimeProfile::Counter* hdfs_write_timer_;
+  /// Time spent compressing data
+  RuntimeProfile::Counter* compress_timer_;
+};
+
+}
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index df9257c98..8bfdf0c3a 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -853,7 +853,7 @@ Status Coordinator::FinalizeHdfsDml() {
   DCHECK(has_called_wait_.Load());
   DCHECK(finalize_params() != nullptr);
   bool is_hive_acid = finalize_params()->__isset.write_id;
-  bool is_iceberg_table = finalize_params()->__isset.spec_id;
+  bool is_iceberg_table = finalize_params()->__isset.iceberg_params;
 
   VLOG_QUERY << "Finalizing query: " << PrintId(query_id());
   SCOPED_TIMER(finalization_timer_);
diff --git a/be/src/runtime/descriptors.cc b/be/src/runtime/descriptors.cc
index 872cd7be2..5b69b9a1b 100644
--- a/be/src/runtime/descriptors.cc
+++ b/be/src/runtime/descriptors.cc
@@ -259,6 +259,7 @@ HdfsTableDescriptor::HdfsTableDescriptor(const 
TTableDescriptor& tdesc, ObjectPo
     iceberg_table_location_ = tdesc.icebergTable.table_location;
     const TIcebergPartitionSpec& spec = tdesc.icebergTable.partition_spec[
         tdesc.icebergTable.default_partition_spec_id];
+    DCHECK_EQ(spec.spec_id, tdesc.icebergTable.default_partition_spec_id);
     for (const TIcebergPartitionField& spec_field : spec.partition_fields) {
       if (spec_field.transform.transform_type == 
TIcebergPartitionTransformType::VOID) {
         continue;
diff --git a/be/src/service/client-request-state.cc 
b/be/src/service/client-request-state.cc
index b59c27e1d..7d57d8745 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -1550,14 +1550,23 @@ Status ClientRequestState::UpdateCatalog() {
         catalog_update.__set_transaction_id(finalize_params.transaction_id);
         catalog_update.__set_write_id(finalize_params.write_id);
       }
-      if (finalize_params.__isset.spec_id) {
+      if (finalize_params.__isset.iceberg_params) {
+        const TIcebergDmlFinalizeParams& ice_finalize_params =
+            finalize_params.iceberg_params;
+        TIcebergOperationParam& cat_ice_op = catalog_update.iceberg_operation;
         catalog_update.__isset.iceberg_operation = true;
-        TIcebergOperationParam& ice_op = catalog_update.iceberg_operation;
-        ice_op.__set_spec_id(finalize_params.spec_id);
-        ice_op.__set_iceberg_data_files_fb(
-            dml_exec_state->CreateIcebergDataFilesVector());
-        ice_op.__set_is_overwrite(finalize_params.is_overwrite);
-        ice_op.__set_initial_snapshot_id(finalize_params.initial_snapshot_id);
+        cat_ice_op.__set_operation(ice_finalize_params.operation);
+        cat_ice_op.__set_initial_snapshot_id(
+            ice_finalize_params.initial_snapshot_id);
+        cat_ice_op.__set_spec_id(ice_finalize_params.spec_id);
+        if (ice_finalize_params.operation == TIcebergOperation::INSERT) {
+          cat_ice_op.__set_iceberg_data_files_fb(
+              dml_exec_state->CreateIcebergDataFilesVector());
+          cat_ice_op.__set_is_overwrite(finalize_params.is_overwrite);
+        } else if (ice_finalize_params.operation == TIcebergOperation::DELETE) 
{
+          cat_ice_op.__set_iceberg_delete_files_fb(
+              dml_exec_state->CreateIcebergDataFilesVector());
+        }
       }
 
       Status cnxn_status;
diff --git a/common/thrift/CatalogObjects.thrift 
b/common/thrift/CatalogObjects.thrift
index 467191450..eb69c3df2 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -603,7 +603,7 @@ struct TIcebergPartitionField {
 }
 
 struct TIcebergPartitionSpec {
-  1: required i32 partition_id
+  1: required i32 spec_id
   2: optional list<TIcebergPartitionField> partition_fields
 }
 
diff --git a/common/thrift/CatalogService.thrift 
b/common/thrift/CatalogService.thrift
index 6606564f6..ceb244227 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -203,13 +203,19 @@ struct TDdlExecResponse {
   6: optional string table_location
 }
 
+
 // Parameters for the Iceberg operation.
 struct TIcebergOperationParam {
+  5: required Types.TIcebergOperation operation
+
   // Iceberg partition spec used by this operation
   1: optional i32 spec_id;
 
   // Iceberg data files to append to the table, encoded in FlatBuffers.
-  2: required list<binary> iceberg_data_files_fb;
+  2: optional list<binary> iceberg_data_files_fb;
+
+  // Iceberg delete files to append to the table, encoded in FlatBuffers.
+  6: optional list<binary> iceberg_delete_files_fb;
 
   // Is overwrite operation
   3: required bool is_overwrite = false;
diff --git a/common/thrift/DataSinks.thrift b/common/thrift/DataSinks.thrift
index 03f6a698a..3076c7ccb 100644
--- a/common/thrift/DataSinks.thrift
+++ b/common/thrift/DataSinks.thrift
@@ -103,6 +103,9 @@ struct THdfsTableSink {
   11: optional map<string, i64> parquet_bloom_filter_col_info;
 }
 
+struct TIcebergDeleteSink {
+}
+
 // Structure to encapsulate specific options that are passed down to the 
KuduTableSink
 struct TKuduTableSink {
   // The position in this vector is equal to the position in the output 
expressions of the
@@ -151,6 +154,7 @@ struct TTableSink {
   3: required TSinkAction action
   4: optional THdfsTableSink hdfs_table_sink
   5: optional TKuduTableSink kudu_table_sink
+  6: optional TIcebergDeleteSink iceberg_delete_sink
 }
 
 struct TDataSink {
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index 2e0157a96..e57608587 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -831,8 +831,19 @@ struct TPlanExecInfo {
       per_node_scan_ranges
 }
 
+struct TIcebergDmlFinalizeParams {
+  // Type of the Iceberg operation
+  1: required Types.TIcebergOperation operation
+
+  // Stores the Iceberg spec id of the partition spec used for this DML 
operation.
+  2: optional i32 spec_id;
+
+  // Stores the Iceberg snapshot id of the target table for this DML operation.
+  3: optional i64 initial_snapshot_id;
+}
+
 // Metadata required to finalize a query - that is, to clean up after the 
query is done.
-// Only relevant for INSERT queries.
+// Only relevant for DML statements.
 struct TFinalizeParams {
   // True if the INSERT query was OVERWRITE, rather than INTO
   1: required bool is_overwrite
@@ -861,11 +872,8 @@ struct TFinalizeParams {
   // Stores the ACID write id of the target table for transactional INSERTs.
   8: optional i64 write_id;
 
-  // Stores the Iceberg spec id of the partition spec used for this INSERT.
-  9: optional i32 spec_id;
-
-  // Stores the Iceberg snapshot id of the target table for INSERTs.
-  10: optional i64 initial_snapshot_id;
+  // Stores params for Iceberg operation
+  9: optional TIcebergDmlFinalizeParams iceberg_params;
 }
 
 // Result of call to ImpalaPlanService/JniFrontend.CreateQueryRequest()
diff --git a/common/thrift/Types.thrift b/common/thrift/Types.thrift
index ecd78d5e7..e10ed0543 100644
--- a/common/thrift/Types.thrift
+++ b/common/thrift/Types.thrift
@@ -111,6 +111,11 @@ enum TStmtType {
   TESTCASE = 7
 }
 
+enum TIcebergOperation {
+  INSERT = 0,
+  DELETE = 1
+}
+
 // Level of verboseness for "explain" output.
 enum TExplainLevel {
   MINIMAL = 0
diff --git a/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java
index c4561152e..5d7031c2b 100644
--- a/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java
@@ -58,7 +58,8 @@ public class DeleteStmt extends ModifyStmt {
     Preconditions.checkState(table_ != null);
     TableSink tableSink = TableSink.create(table_, TableSink.Op.DELETE,
         ImmutableList.<Expr>of(), resultExprs, referencedColumns_, false, 
false,
-        new Pair<>(ImmutableList.<Integer>of(), TSortingOrder.LEXICAL), -1, 
kuduTxnToken_,
+        new Pair<>(ImmutableList.<Integer>of(), TSortingOrder.LEXICAL), -1,
+        getKuduTransactionToken(),
         0);
     Preconditions.checkState(!referencedColumns_.isEmpty());
     return tableSink;
diff --git 
a/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionSpec.java 
b/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionSpec.java
index f56d48441..8c937178d 100644
--- a/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionSpec.java
+++ b/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionSpec.java
@@ -36,13 +36,13 @@ import org.apache.impala.thrift.TIcebergPartitionSpec;
  * )
  */
 public class IcebergPartitionSpec extends StmtNode {
-  // Partition id from iceberg PartitionSpec
-  private int partitionId_;
+  // Partition spec id from iceberg PartitionSpec
+  private int specId_;
 
   private List<IcebergPartitionField> icebergPartitionFields_;
 
   public IcebergPartitionSpec(int partitionId, List<IcebergPartitionField> 
fields) {
-    partitionId_ = partitionId;
+    specId_ = partitionId;
     icebergPartitionFields_ = fields;
   }
 
@@ -94,7 +94,7 @@ public class IcebergPartitionSpec extends StmtNode {
 
   public TIcebergPartitionSpec toThrift() {
     TIcebergPartitionSpec result = new TIcebergPartitionSpec();
-    result.setPartition_id(partitionId_);
+    result.setSpec_id(specId_);
     if (!hasPartitionFields()) return result;
     for (IcebergPartitionField field : icebergPartitionFields_) {
       result.addToPartition_fields(field.toThrift());
diff --git a/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
index 91079f310..6599fb443 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
@@ -29,15 +29,17 @@ import java.util.Set;
 
 import org.apache.impala.authorization.Privilege;
 import org.apache.impala.catalog.Column;
+import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
+import org.apache.impala.catalog.IcebergPositionDeleteTable;
 import org.apache.impala.catalog.KuduColumn;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.planner.DataSink;
 import org.apache.impala.rewrite.ExprRewriter;
-import org.apache.thrift.TBaseHelper;
+import org.apache.impala.thrift.TIcebergFileFormat;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -79,13 +81,14 @@ public abstract class ModifyStmt extends StatementBase {
   // will be modified.
   protected SelectStmt sourceStmt_;
 
-  // Target Kudu table. Since currently only Kudu tables are supported, we use 
a
-  // concrete table class. Result of analysis.
-  protected FeKuduTable table_;
+  // Target table.
+  protected FeTable table_;
+
+  private ModifyImpl modifyImpl_;
 
   // Serialized metadata of transaction object which is set by the Frontend if 
the
   // target table is Kudu table and Kudu's transaction is enabled.
-  protected java.nio.ByteBuffer kuduTxnToken_;
+  java.nio.ByteBuffer kuduTxnToken_;
 
   // END: Members that need to be reset()
   /////////////////////////////////////////
@@ -104,7 +107,6 @@ public abstract class ModifyStmt extends StatementBase {
     fromClause_ = Preconditions.checkNotNull(fromClause);
     assignments_ = Preconditions.checkNotNull(assignmentExprs);
     wherePredicate_ = wherePredicate;
-    kuduTxnToken_ = null;
   }
 
   @Override
@@ -159,13 +161,21 @@ public abstract class ModifyStmt extends StatementBase {
 
     Preconditions.checkNotNull(targetTableRef_);
     FeTable dstTbl = targetTableRef_.getTable();
-    // Only Kudu tables can be updated
-    if (!(dstTbl instanceof FeKuduTable)) {
+    table_ = dstTbl;
+    // Only Kudu and Iceberg tables can be updated.
+    if (!(dstTbl instanceof FeKuduTable) && !(dstTbl instanceof 
FeIcebergTable)) {
       throw new AnalysisException(
-          format("Impala does not support modifying a non-Kudu table: %s",
+          format("Impala only supports modifying Kudu and Iceberg tables, " +
+              "but the following table is neither: %s",
               dstTbl.getFullName()));
     }
-    table_ = (FeKuduTable) dstTbl;
+    if (dstTbl instanceof FeKuduTable) {
+      modifyImpl_ = this.new ModifyKudu();
+    } else if (dstTbl instanceof FeIcebergTable) {
+      modifyImpl_ = this.new ModifyIceberg();
+    }
+
+    modifyImpl_.analyze(analyzer);
 
     // Make sure that the user is allowed to modify the target table. Use ALL 
because no
     // UPDATE / DELETE privilege exists yet (IMPALA-3840).
@@ -186,6 +196,7 @@ public abstract class ModifyStmt extends StatementBase {
     fromClause_.reset();
     if (sourceStmt_ != null) sourceStmt_.reset();
     table_ = null;
+    modifyImpl_ = null;
   }
 
   /**
@@ -208,13 +219,7 @@ public abstract class ModifyStmt extends StatementBase {
     sourceStmt_ = new SelectStmt(new SelectList(selectList), fromClause_, 
wherePredicate_,
         null, null, null, null);
 
-    // cast result expressions to the correct type of the referenced slot of 
the
-    // target table
-    int keyColumnsOffset = table_.getPrimaryKeyColumnNames().size();
-    for (int i = keyColumnsOffset; i < sourceStmt_.resultExprs_.size(); ++i) {
-      sourceStmt_.resultExprs_.set(i, sourceStmt_.resultExprs_.get(i).castTo(
-          assignments_.get(i - keyColumnsOffset).first.getType()));
-    }
+    modifyImpl_.addCastsToAssignmentsInSourceStmt(analyzer);
   }
 
   /**
@@ -244,16 +249,8 @@ public abstract class ModifyStmt extends StatementBase {
       colIndexMap.put(cols.get(i).getName(), i);
     }
 
-    // Add the key columns as slot refs
-    for (String k : table_.getPrimaryKeyColumnNames()) {
-      List<String> path = Path.createRawPath(targetTableRef_.getUniqueAlias(), 
k);
-      SlotRef ref = new SlotRef(path);
-      ref.analyze(analyzer);
-      selectList.add(new SelectListItem(ref, null));
-      uniqueSlots.add(ref.getSlotId());
-      keySlots.add(ref.getSlotId());
-      referencedColumns.add(colIndexMap.get(k));
-    }
+    modifyImpl_.addKeyColumns(analyzer, selectList, referencedColumns, 
uniqueSlots,
+        keySlots, colIndexMap);
 
     // Assignments are only used in the context of updates.
     for (Pair<SlotRef, Expr> valueAssignment : assignments_) {
@@ -341,6 +338,7 @@ public abstract class ModifyStmt extends StatementBase {
    * Set Kudu transaction token.
    */
   public void setKuduTransactionToken(byte[] kuduTxnToken) {
+    Preconditions.checkState(table_ instanceof FeKuduTable);
     Preconditions.checkNotNull(kuduTxnToken);
     kuduTxnToken_ = java.nio.ByteBuffer.wrap(kuduTxnToken.clone());
   }
@@ -348,10 +346,135 @@ public abstract class ModifyStmt extends StatementBase {
   /**
    * Return bytes of Kudu transaction token.
    */
-  public byte[] getKuduTransactionToken() {
-    return kuduTxnToken_ == null ? null : kuduTxnToken_.array();
+  public java.nio.ByteBuffer getKuduTransactionToken() {
+    return kuduTxnToken_;
+  }
+
+  private void addKeyColumn(Analyzer analyzer, List<SelectListItem> selectList,
+      List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> 
keySlots,
+      Map<String, Integer> colIndexMap, String colName) throws 
AnalysisException {
+    List<String> path = Path.createRawPath(targetTableRef_.getUniqueAlias(), 
colName);
+    SlotRef ref = new SlotRef(path);
+    ref.analyze(analyzer);
+    selectList.add(new SelectListItem(ref, null));
+    uniqueSlots.add(ref.getSlotId());
+    keySlots.add(ref.getSlotId());
+    referencedColumns.add(colIndexMap.get(colName));
   }
 
   @Override
   public abstract String toSql(ToSqlOptions options);
+
+  private interface ModifyImpl {
+    void analyze(Analyzer analyzer) throws AnalysisException;
+
+    void addCastsToAssignmentsInSourceStmt(Analyzer analyzer)
+      throws AnalysisException;
+
+    void addKeyColumns(Analyzer analyzer,
+        List<SelectListItem> selectList, List<Integer> referencedColumns,
+        Set<SlotId> uniqueSlots, Set<SlotId> keySlots, Map<String, Integer> 
colIndexMap)
+        throws AnalysisException;
+  }
+
+  private class ModifyKudu implements ModifyImpl {
+    // Target Kudu table. Result of analysis.
+    FeKuduTable kuduTable_ = (FeKuduTable)table_;
+
+    @Override
+    public void analyze(Analyzer analyzer) throws AnalysisException {}
+
+    @Override
+    public void addCastsToAssignmentsInSourceStmt(Analyzer analyzer)
+        throws AnalysisException {
+      // cast result expressions to the correct type of the referenced slot of 
the
+      // target table
+      int keyColumnsOffset = kuduTable_.getPrimaryKeyColumnNames().size();
+      for (int i = keyColumnsOffset; i < sourceStmt_.resultExprs_.size(); ++i) 
{
+        sourceStmt_.resultExprs_.set(i, sourceStmt_.resultExprs_.get(i).castTo(
+            assignments_.get(i - keyColumnsOffset).first.getType()));
+      }
+    }
+
+    @Override
+    public void addKeyColumns(Analyzer analyzer, List<SelectListItem> 
selectList,
+        List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> 
keySlots,
+        Map<String, Integer> colIndexMap) throws AnalysisException {
+      // Add the key columns as slot refs
+      for (String k : kuduTable_.getPrimaryKeyColumnNames()) {
+        addKeyColumn(analyzer, selectList, referencedColumns, uniqueSlots, 
keySlots,
+            colIndexMap, k);
+      }
+    }
+  }
+
+  private class ModifyIceberg implements ModifyImpl {
+    FeIcebergTable originalTargetTable_;
+    IcebergPositionDeleteTable icePosDelTable_;
+
+    public ModifyIceberg() {
+      originalTargetTable_ = (FeIcebergTable)table_;
+      icePosDelTable_ = new IcebergPositionDeleteTable((FeIcebergTable)table_);
+      // Make the virtual position delete table the new target table.
+      table_ = icePosDelTable_;
+    }
+
+    @Override
+    public void analyze(Analyzer analyzer) throws AnalysisException {
+      if (ModifyStmt.this instanceof UpdateStmt) {
+        throw new AnalysisException("UPDATE is not supported for Iceberg table 
" +
+            originalTargetTable_.getFullName());
+      }
+
+      if (icePosDelTable_.getFormatVersion() == 1) {
+        throw new AnalysisException("Iceberg V1 table do not support 
DELETE/UPDATE " +
+            "operations: " + originalTargetTable_.getFullName());
+      }
+
+      String deleteMode = 
originalTargetTable_.getIcebergApiTable().properties().get(
+          org.apache.iceberg.TableProperties.DELETE_MODE);
+      if (deleteMode != null && !deleteMode.equals("merge-on-read")) {
+        throw new AnalysisException(String.format("Unsupported delete mode: 
'%s' for " +
+            "Iceberg table: %s", deleteMode, 
originalTargetTable_.getFullName()));
+      }
+
+      if (originalTargetTable_.isPartitioned()) {
+        throw new AnalysisException("Cannot execute DELETE/UPDATE statement on 
" +
+            "partitioned Iceberg table: " + 
originalTargetTable_.getFullName());
+      }
+
+      if (originalTargetTable_.getDeleteFileFormat() != 
TIcebergFileFormat.PARQUET) {
+        throw new AnalysisException("Impala can only write delete files in 
PARQUET, " +
+            "but the given table uses a different file format: " +
+            originalTargetTable_.getFullName());
+      }
+
+      if (wherePredicate_ == null ||
+          
org.apache.impala.analysis.Expr.IS_TRUE_LITERAL.apply(wherePredicate_)) {
+        // TODO (IMPALA-12136): rewrite DELETE FROM t; statements to TRUNCATE 
TABLE t;
+        throw new AnalysisException("For deleting every row, please use 
TRUNCATE.");
+      }
+    }
+
+    @Override
+    public void addCastsToAssignmentsInSourceStmt(Analyzer analyzer)
+        throws AnalysisException {
+    }
+
+    @Override
+    public void addKeyColumns(Analyzer analyzer, List<SelectListItem> 
selectList,
+        List<Integer> referencedColumns,
+        Set<SlotId> uniqueSlots, Set<SlotId> keySlots, Map<String, Integer> 
colIndexMap)
+        throws AnalysisException {
+      String[] deleteCols;
+      Preconditions.checkState(!icePosDelTable_.isPartitioned());
+      deleteCols = new String[] {"INPUT__FILE__NAME", "FILE__POSITION"};
+      // Add the key columns as slot refs
+      for (String k : deleteCols) {
+        addKeyColumn(analyzer, selectList, referencedColumns, uniqueSlots, 
keySlots,
+            colIndexMap, k);
+      }
+    }
+  }
+
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/SlotRef.java 
b/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
index 8f64b52c6..dbd415322 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
@@ -232,7 +232,7 @@ public class SlotRef extends Expr {
         }
         FeFsTable feTable = (FeFsTable) rootTable;
         for (HdfsFileFormat format : feTable.getFileFormats()) {
-          if (format != HdfsFileFormat.ORC && format != 
HdfsFileFormat.PARQUET) {
+          if (!formatSupportsQueryingStruct(format)) {
             throw new AnalysisException("Querying STRUCT is only supported for 
ORC and "
                 + "Parquet file formats.");
           }
@@ -241,6 +241,15 @@ public class SlotRef extends Expr {
     }
   }
 
+  // Returns true if the given HdfsFileFormat supports querying STRUCT types. 
Iceberg
+  // tables also have ICEBERG as HdfsFileFormat. We can return TRUE in case of 
Iceberg
+  // because the data file formats in the Iceberg table will be also tested 
separately.
+  private static boolean formatSupportsQueryingStruct(HdfsFileFormat format) {
+    return format == HdfsFileFormat.PARQUET ||
+           format == HdfsFileFormat.ORC ||
+           format == HdfsFileFormat.ICEBERG;
+  }
+
   // Assumes this 'SlotRef' is a struct and that desc_.itemTupleDesc_ has 
already been
   // filled. Creates the children 'SlotRef's for the struct recursively.
   private void addStructChildrenAsSlotRefs() {
diff --git a/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java
index 8cd77345b..81661657a 100644
--- a/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java
@@ -67,7 +67,8 @@ public class UpdateStmt extends ModifyStmt {
     Preconditions.checkState(table_ != null);
     DataSink dataSink = TableSink.create(table_, TableSink.Op.UPDATE,
         ImmutableList.<Expr>of(), resultExprs, referencedColumns_, false, 
false,
-        new Pair<>(ImmutableList.<Integer>of(), TSortingOrder.LEXICAL), -1, 
kuduTxnToken_,
+        new Pair<>(ImmutableList.<Integer>of(), TSortingOrder.LEXICAL), -1,
+        getKuduTransactionToken(),
         0);
     Preconditions.checkState(!referencedColumns_.isEmpty());
     return dataSink;
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java 
b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
index ca9e2e05c..72afca9d7 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
@@ -78,6 +79,7 @@ import org.apache.impala.thrift.THdfsTable;
 import org.apache.impala.thrift.TIcebergCatalog;
 import org.apache.impala.thrift.TIcebergFileFormat;
 import org.apache.impala.thrift.TIcebergPartitionStats;
+import org.apache.impala.thrift.TIcebergPartitionTransformType;
 import org.apache.impala.thrift.TIcebergTable;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.thrift.TResultSet;
@@ -175,6 +177,10 @@ public interface FeIcebergTable extends FeFsTable {
    */
   int getDefaultPartitionSpecId();
 
+  default int getFormatVersion() {
+    return 
((BaseTable)getIcebergApiTable()).operations().current().formatVersion();
+  }
+
   /**
    * @return the Iceberg schema.
    */
@@ -287,6 +293,22 @@ public interface FeIcebergTable extends FeFsTable {
     return getFeFsTable().getHostIndex();
   }
 
+  /**
+   * @return true if there's at least one partition spec that has at least one 
non-VOID
+   * partition field.
+   */
+  default boolean isPartitioned() {
+    for (IcebergPartitionSpec spec : getPartitionSpecs()) {
+      if (spec.getIcebergPartitionFieldsSize() == 0) continue;
+      for (IcebergPartitionField partField : spec.getIcebergPartitionFields()) 
{
+        if (partField.getTransformType() != 
TIcebergPartitionTransformType.VOID) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
   @Override /* FeTable */
   default boolean isComputedPartitionColumn(Column col) {
     Preconditions.checkState(col instanceof IcebergColumn);
@@ -313,6 +335,23 @@ public interface FeIcebergTable extends FeFsTable {
     return -1;
   }
 
+  default TIcebergFileFormat getWriteFileFormat() {
+    return IcebergUtil.getIcebergFileFormat(
+        getIcebergApiTable().properties().getOrDefault(
+            TableProperties.DEFAULT_FILE_FORMAT,
+            TableProperties.DEFAULT_FILE_FORMAT_DEFAULT));
+  }
+
+  default TIcebergFileFormat getDeleteFileFormat() {
+    String deleteFormat =
+        
getIcebergApiTable().properties().get(TableProperties.DELETE_DEFAULT_FILE_FORMAT);
+    if (deleteFormat != null) {
+      return IcebergUtil.getIcebergFileFormat(deleteFormat);
+    }
+    // If delete file format is not specified, use  write format.
+    return getWriteFileFormat();
+  }
+
   /**
    * Sets 'tableStats_' for the Iceberg table by it's partition stats.
    * TODO: Now the calculation of V2 Iceberg table is not accurate. After
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/IcebergPositionDeleteTable.java 
b/fe/src/main/java/org/apache/impala/catalog/IcebergPositionDeleteTable.java
index d5929ab36..1fc4aece7 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergPositionDeleteTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergPositionDeleteTable.java
@@ -17,6 +17,7 @@
 
 package org.apache.impala.catalog;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -48,6 +49,11 @@ public class IcebergPositionDeleteTable extends VirtualTable 
implements FeIceber
   public static String FILE_PATH_COLUMN = "file_path";
   public static String POS_COLUMN = "pos";
 
+  public IcebergPositionDeleteTable(FeIcebergTable baseTable) {
+    this(baseTable, baseTable.getName() + "-POSITION-DELETE", 
Collections.emptySet(), 0,
+        new TColumnStats());
+  }
+
   public IcebergPositionDeleteTable(FeIcebergTable baseTable, String name,
       Set<FileDescriptor> deleteFiles,
       long deleteRecordsCount, TColumnStats filePathsStats) {
@@ -67,6 +73,8 @@ public class IcebergPositionDeleteTable extends VirtualTable 
implements FeIceber
     addColumn(pos);
   }
 
+  public FeIcebergTable getBaseTable() { return baseTable_; }
+
   private TColumnStats getPosStats(Column pos) {
     TColumnStats colStats = new TColumnStats();
     colStats.num_distinct_values = deleteRecordsCount_;
@@ -165,7 +173,7 @@ public class IcebergPositionDeleteTable extends 
VirtualTable implements FeIceber
 
   @Override
   public List<IcebergPartitionSpec> getPartitionSpecs() {
-    return null;
+    return baseTable_.getPartitionSpecs();
   }
 
   @Override
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java 
b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
index 2dead16eb..2c385db9c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
@@ -486,10 +486,10 @@ public class IcebergTable extends Table implements 
FeIcebergTable {
               new 
IcebergPartitionTransform(field.getTransform().getTransform_type(),
                   transformParam)));
         }
-        ret.add(new IcebergPartitionSpec(param.getPartition_id(),
+        ret.add(new IcebergPartitionSpec(param.getSpec_id(),
             fields));
       } else {
-        ret.add(new IcebergPartitionSpec(param.getPartition_id(), null));
+        ret.add(new IcebergPartitionSpec(param.getSpec_id(), null));
       }
     }
     return ret;
diff --git a/fe/src/main/java/org/apache/impala/catalog/VirtualColumn.java 
b/fe/src/main/java/org/apache/impala/catalog/VirtualColumn.java
index 4b9a138fe..85e88a852 100644
--- a/fe/src/main/java/org/apache/impala/catalog/VirtualColumn.java
+++ b/fe/src/main/java/org/apache/impala/catalog/VirtualColumn.java
@@ -39,11 +39,10 @@ public class VirtualColumn extends Column {
       Type.BIGINT, TVirtualColumnType.FILE_POSITION);
 
   public static VirtualColumn getVirtualColumn(TVirtualColumnType virtColType) 
{
-    if (virtColType == TVirtualColumnType.INPUT_FILE_NAME) {
-      return INPUT_FILE_NAME;
-    }
-    if (virtColType == TVirtualColumnType.FILE_POSITION) {
-      return FILE_POSITION;
+    switch (virtColType) {
+      case INPUT_FILE_NAME: return INPUT_FILE_NAME;
+      case FILE_POSITION: return FILE_POSITION;
+      default: break;
     }
     return null;
   }
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 3b6320a8c..55904d3f6 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -311,10 +311,10 @@ public class HdfsScanNode extends ScanNode {
   private final Map<TupleDescriptor, List<Expr>> statsOriginalConjuncts_ =
       new LinkedHashMap<>();
 
-  // Tuple that is used to materialize statistics when scanning Parquet files. 
For each
-  // column it can contain 0, 1, or 2 slots, depending on whether the column 
needs to be
-  // evaluated against the min and/or the max value of the corresponding
-  // parquet::Statistics.
+  // Tuple that is used to materialize statistics when scanning Parquet or ORC 
files.
+  // For each column it can contain 0, 1, or 2 slots, depending on whether the 
column
+  // needs to be evaluated against the min and/or the max value of the 
corresponding
+  // file statistics.
   private TupleDescriptor statsTuple_;
 
   // The list of overlap predicate descs. See TOverlapPredicateDesc in 
PlanNodes.thrift.
@@ -738,6 +738,7 @@ public class HdfsScanNode extends ScanNode {
    */
   private void computeStatsTupleAndConjuncts(Analyzer analyzer) throws 
ImpalaException{
     Preconditions.checkNotNull(desc_.getPath());
+    if (statsTuple_ != null) return;
     String tupleName = desc_.getPath().toString() + " statistics";
     DescriptorTable descTbl = analyzer.getDescTbl();
     statsTuple_ = descTbl.createTupleDescriptor(tupleName);
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java 
b/fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java
new file mode 100644
index 000000000..d3fb6f4bd
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java
@@ -0,0 +1,160 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.planner;
+
+import java.util.List;
+
+import org.apache.impala.analysis.DescriptorTable;
+import org.apache.impala.analysis.Expr;
+import org.apache.impala.catalog.FeIcebergTable;
+
+import org.apache.impala.thrift.TDataSink;
+import org.apache.impala.thrift.TDataSinkType;
+import org.apache.impala.thrift.TExplainLevel;
+import org.apache.impala.thrift.TIcebergDeleteSink;
+import org.apache.impala.thrift.TQueryOptions;
+import org.apache.impala.thrift.TTableSink;
+import org.apache.impala.thrift.TTableSinkType;
+
+/**
+ * Sink for deleting data from Iceberg tables. Impala deletes data via 
'merge-on-read'
+ * strategy, which means it writes Iceberg position delete files. These files 
contain
+ * information (file_path, position) about the deleted rows. Query engines 
reading from
+ * an Iceberg table need to exclude the deleted rows from the result of the 
table scan.
+ * Impala does this by doing an ANTI JOIN between data files and delete files.
+ */
+public class IcebergDeleteSink extends TableSink {
+
+  // Set the limit on the maximum number of hdfs table sink instances.
+  // A value of 0 means no limit.
+  private int maxHdfsSinks_;
+
+  public IcebergDeleteSink(FeIcebergTable targetTable, List<Expr> outputExprs,
+      int maxTableSinks) {
+    super(targetTable, Op.DELETE, outputExprs);
+    maxHdfsSinks_ = maxTableSinks;
+  }
+
+  @Override
+  public void computeProcessingCost(TQueryOptions queryOptions) {
+    // The processing cost to export rows.
+    processingCost_ = computeDefaultProcessingCost();
+  }
+
+  @Override
+  public void computeResourceProfile(TQueryOptions queryOptions) {
+    PlanNode inputNode = fragment_.getPlanRoot();
+    final int numInstances = fragment_.getNumInstances();
+    // Input is clustered, so it produces a single partition at a time.
+    final long numBufferedPartitionsPerInstance = 1;
+    // For regular Parquet files we estimate 1GB memory consumption which is 
already
+    // a conservative, i.e. probably too high memory estimate.
+    // Writing out position delete files means we are writing filenames and 
positions
+    // per partition. So assuming 0.5 GB per position delete file writer can 
be still
+    // considered a very conservative estimate.
+    final long perPartitionMemReq = 512L * 1024L * 1024L;
+
+    long perInstanceMemEstimate;
+    // The estimate is based purely on the per-partition mem req if the input 
cardinality_
+    // or the avg row size is unknown.
+    if (inputNode.getCardinality() == -1 || inputNode.getAvgRowSize() == -1) {
+      perInstanceMemEstimate = numBufferedPartitionsPerInstance * 
perPartitionMemReq;
+    } else {
+      // The per-partition estimate may be higher than the memory required to 
buffer
+      // the entire input data.
+      long perInstanceInputCardinality =
+          Math.max(1L, inputNode.getCardinality() / numInstances);
+      long perInstanceInputBytes =
+          (long) Math.ceil(perInstanceInputCardinality * 
inputNode.getAvgRowSize());
+      long perInstanceMemReq =
+          PlanNode.checkedMultiply(numBufferedPartitionsPerInstance, 
perPartitionMemReq);
+      perInstanceMemEstimate = Math.min(perInstanceInputBytes, 
perInstanceMemReq);
+    }
+    resourceProfile_ = ResourceProfile.noReservation(perInstanceMemEstimate);
+  }
+
+  @Override
+  public void appendSinkExplainString(String prefix, String detailPrefix,
+      TQueryOptions queryOptions, TExplainLevel explainLevel, StringBuilder 
output) {
+    output.append(String.format("%sDELETE FROM ICEBERG [%s]\n", prefix,
+        targetTable_.getFullName()));
+    if (explainLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
+      output.append(detailPrefix + "output exprs: ")
+          .append(Expr.getExplainString(outputExprs_, explainLevel) + "\n");
+    }
+  }
+
+  @Override
+  protected String getLabel() {
+    return "ICEBERG DELETER";
+  }
+
+  @Override
+  protected void toThriftImpl(TDataSink tsink) {
+    TIcebergDeleteSink icebergDeleteSink = new TIcebergDeleteSink();
+    TTableSink tTableSink = new TTableSink(DescriptorTable.TABLE_SINK_ID,
+            TTableSinkType.HDFS, sinkOp_.toThrift());
+    tTableSink.iceberg_delete_sink = icebergDeleteSink;
+    tsink.table_sink = tTableSink;
+    tsink.output_exprs = Expr.treesToThrift(outputExprs_);
+  }
+
+  @Override
+  protected TDataSinkType getSinkType() {
+    return TDataSinkType.TABLE_SINK;
+  }
+
+  @Override
+  public void collectExprs(List<Expr> exprs) {
+    exprs.addAll(outputExprs_);
+  }
+
+  /**
+   * Return an estimate of the number of nodes the fragment with this sink will
+   * run on. This is based on the number of nodes set for the plan root and 
has an
+   * upper limit set by the MAX_HDFS_WRITER query option.
+   */
+  public int getNumNodes() {
+    int num_nodes = getFragment().getPlanRoot().getNumNodes();
+    if (maxHdfsSinks_ > 0) {
+      // If there are more nodes than instances where the fragment was 
initially
+      // planned to run then, then the instances will be distributed evenly 
across them.
+      num_nodes = Math.min(num_nodes, getNumInstances());
+    }
+    return num_nodes;
+  }
+
+  /**
+   * Return an estimate of the number of instances the fragment with this sink
+   * will run on. This is based on the number of instances set for the plan 
root
+   * and has an upper limit set by the MAX_HDFS_WRITER query option.
+   */
+  public int getNumInstances() {
+    int num_instances = getFragment().getPlanRoot().getNumInstances();
+    if (maxHdfsSinks_ > 0) {
+      num_instances =  Math.min(num_instances, maxHdfsSinks_);
+    }
+    return num_instances;
+  }
+
+  @Override
+  public void computeRowConsumptionAndProductionToCost() {
+    super.computeRowConsumptionAndProductionToCost();
+    fragment_.setFixedInstanceCount(fragment_.getNumInstances());
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java 
b/fe/src/main/java/org/apache/impala/planner/Planner.java
index 3dfc288f6..0975b4b86 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -27,6 +27,7 @@ import org.apache.impala.analysis.AnalysisContext;
 import org.apache.impala.analysis.AnalysisContext.AnalysisResult;
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.ColumnLineageGraph;
+import org.apache.impala.analysis.DeleteStmt;
 import org.apache.impala.analysis.ColumnLineageGraph.ColumnLabel;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.ExprSubstitutionMap;
@@ -36,6 +37,7 @@ import org.apache.impala.analysis.QueryStmt;
 import org.apache.impala.analysis.SortInfo;
 import org.apache.impala.analysis.TupleId;
 import org.apache.impala.catalog.FeHBaseTable;
+import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.common.ImpalaException;
@@ -50,6 +52,7 @@ import org.apache.impala.thrift.TQueryCtx;
 import org.apache.impala.thrift.TQueryExecRequest;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TRuntimeFilterMode;
+import org.apache.impala.thrift.TSortingOrder;
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.util.EventSequence;
 import org.apache.impala.util.KuduUtil;
@@ -175,8 +178,14 @@ public class Planner {
             
ctx_.getAnalysisResult().getUpdateStmt().createDataSink(resultExprs));
       } else if (ctx_.isDelete()) {
         // Set up delete sink for root fragment
-        rootFragment.setSink(
-            
ctx_.getAnalysisResult().getDeleteStmt().createDataSink(resultExprs));
+        DeleteStmt deleteStmt = ctx_.getAnalysisResult().getDeleteStmt();
+        if (deleteStmt.getTargetTable() instanceof FeIcebergTable) {
+          createPreDeleteSort(deleteStmt, rootFragment, 
ctx_.getRootAnalyzer());
+          SortNode sortNode = (SortNode)rootFragment.getPlanRoot();
+          resultExprs = Expr.substituteList(resultExprs,
+              sortNode.getSortInfo().getOutputSmap(), ctx_.getRootAnalyzer(), 
true);
+        }
+        rootFragment.setSink(deleteStmt.createDataSink(resultExprs));
       } else if (ctx_.isQuery()) {
         rootFragment.setSink(
             
ctx_.getAnalysisResult().getQueryStmt().createDataSink(resultExprs));
@@ -895,4 +904,25 @@ public class Planner {
 
     inputFragment.setPlanRoot(node);
   }
+
+  public void createPreDeleteSort(DeleteStmt deleteStmt, PlanFragment 
inputFragment,
+      Analyzer analyzer) throws ImpalaException {
+    List<Expr> orderingExprs = new ArrayList<>();
+
+    orderingExprs.addAll(deleteStmt.getResultExprs());
+
+    // Build sortinfo to sort by the ordering exprs.
+    List<Boolean> isAscOrder = Collections.nCopies(orderingExprs.size(), true);
+    List<Boolean> nullsFirstParams = Collections.nCopies(orderingExprs.size(), 
false);
+    SortInfo sortInfo = new SortInfo(orderingExprs, isAscOrder, 
nullsFirstParams,
+        TSortingOrder.LEXICAL);
+    sortInfo.createSortTupleInfo(deleteStmt.getResultExprs(), analyzer);
+    sortInfo.getSortTupleDescriptor().materializeSlots();
+
+    PlanNode node = SortNode.createTotalSortNode(
+        ctx_.getNextNodeId(), inputFragment.getPlanRoot(), sortInfo, 0);
+    node.init(analyzer);
+
+    inputFragment.setPlanRoot(node);
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/TableSink.java 
b/fe/src/main/java/org/apache/impala/planner/TableSink.java
index 0f34ddb20..df4fd4c73 100644
--- a/fe/src/main/java/org/apache/impala/planner/TableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/TableSink.java
@@ -22,6 +22,7 @@ import java.util.List;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeHBaseTable;
+import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.common.Pair;
@@ -128,6 +129,17 @@ public abstract class TableSink extends DataSink {
     Preconditions.checkNotNull(partitionKeyExprs);
     Preconditions.checkNotNull(referencedColumns);
     Preconditions.checkNotNull(sortProperties.first);
+    if (table instanceof FeIcebergTable) {
+      if (sinkAction == Op.INSERT) {
+        return new HdfsTableSink(table, partitionKeyExprs,outputExprs, 
overwrite,
+            inputIsClustered, sortProperties, writeId, maxTableSinks, 
isResultSink);
+      } else if (sinkAction == Op.DELETE) {
+        return new IcebergDeleteSink((FeIcebergTable)table, outputExprs, 
maxTableSinks);
+      } else {
+        // We don't support any other sink actions yet.
+        Preconditions.checkState(false);
+      }
+    }
     if (table instanceof FeFsTable) {
       // Hdfs only supports inserts.
       Preconditions.checkState(sinkAction == Op.INSERT);
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 811db2cfc..b48f4e68a 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -6842,7 +6842,7 @@ public class CatalogOpExecutor {
       if (table instanceof FeIcebergTable && update.isSetIceberg_operation()) {
         FeIcebergTable iceTbl = (FeIcebergTable)table;
         org.apache.iceberg.Transaction iceTxn = 
IcebergUtil.getIcebergTransaction(iceTbl);
-        IcebergCatalogOpExecutor.appendFiles(iceTbl, iceTxn,
+        IcebergCatalogOpExecutor.execute(iceTbl, iceTxn,
             update.getIceberg_operation());
         if (isIcebergHmsIntegrationEnabled(iceTbl.getMetaStoreTable())) {
           // Add catalog service id and the 'newCatalogVersion' to the table 
parameters.
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java 
b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 248018130..f10762c50 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -74,6 +74,7 @@ import org.apache.impala.analysis.CreateDataSrcStmt;
 import org.apache.impala.analysis.CreateDropRoleStmt;
 import org.apache.impala.analysis.CreateUdaStmt;
 import org.apache.impala.analysis.CreateUdfStmt;
+import org.apache.impala.analysis.DeleteStmt;
 import org.apache.impala.analysis.DescribeTableStmt;
 import org.apache.impala.analysis.DescriptorTable;
 import org.apache.impala.analysis.DropDataSrcStmt;
@@ -119,6 +120,7 @@ import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.Function;
+import org.apache.impala.catalog.IcebergPositionDeleteTable;
 import org.apache.impala.catalog.ImpaladCatalog;
 import org.apache.impala.catalog.ImpaladTableUsageTracker;
 import org.apache.impala.catalog.MaterializedViewHdfsTable;
@@ -166,6 +168,8 @@ import org.apache.impala.thrift.TDdlType;
 import org.apache.impala.thrift.TDescribeHistoryParams;
 import org.apache.impala.thrift.TDescribeOutputStyle;
 import org.apache.impala.thrift.TDescribeResult;
+import org.apache.impala.thrift.TIcebergDmlFinalizeParams;
+import org.apache.impala.thrift.TIcebergOperation;
 import org.apache.impala.thrift.TExecRequest;
 import org.apache.impala.thrift.TExecutorGroupSet;
 import org.apache.impala.thrift.TExplainResult;
@@ -2478,6 +2482,10 @@ public class Frontend {
             analysisResult.isUpdateStmt() || analysisResult.isDeleteStmt());
         result.stmt_type = TStmtType.DML;
         result.query_exec_request.stmt_type = TStmtType.DML;
+        if (analysisResult.isDeleteStmt()) {
+          addFinalizationParamsForDelete(queryCtx, queryExecRequest,
+              analysisResult.getDeleteStmt());
+        }
       }
       return result;
     } catch (Exception e) {
@@ -2540,33 +2548,48 @@ public class Frontend {
         targetTable, insertStmt.getWriteId(), insertStmt.isOverwrite());
   }
 
+  private static void addFinalizationParamsForDelete(
+    TQueryCtx queryCtx, TQueryExecRequest queryExecRequest, DeleteStmt 
deleteStmt) {
+      FeTable targetTable = deleteStmt.getTargetTable();
+      if (!(targetTable instanceof FeIcebergTable)) return;
+      Preconditions.checkState(targetTable instanceof 
IcebergPositionDeleteTable);
+      targetTable = ((IcebergPositionDeleteTable)targetTable).getBaseTable();
+      TFinalizeParams finalizeParams = addFinalizationParamsForDml(
+          queryCtx, targetTable, false);
+      TIcebergDmlFinalizeParams iceFinalizeParams = new 
TIcebergDmlFinalizeParams();
+      iceFinalizeParams.operation = TIcebergOperation.DELETE;
+      FeIcebergTable iceTable = (FeIcebergTable)targetTable;
+      iceFinalizeParams.setSpec_id(iceTable.getDefaultPartitionSpecId());
+      iceFinalizeParams.setInitial_snapshot_id(iceTable.snapshotId());
+      finalizeParams.setIceberg_params(iceFinalizeParams);
+      queryExecRequest.setFinalize_params(finalizeParams);
+  }
+
   // This is public to allow external frontends to utilize this method to fill 
in the
   // finalization parameters for externally generated INSERTs.
   public static void addFinalizationParamsForInsert(
       TQueryCtx queryCtx, TQueryExecRequest queryExecRequest, FeTable 
targetTable,
-          long writeId, boolean isOverwrite) {
+      long writeId, boolean isOverwrite) {
+    TFinalizeParams finalizeParams = addFinalizationParamsForDml(
+        queryCtx, targetTable, isOverwrite);
     if (targetTable instanceof FeFsTable) {
-      TFinalizeParams finalizeParams = new TFinalizeParams();
-      finalizeParams.setIs_overwrite(isOverwrite);
-      finalizeParams.setTable_name(targetTable.getTableName().getTbl());
-      finalizeParams.setTable_id(DescriptorTable.TABLE_SINK_ID);
-      String db = targetTable.getTableName().getDb();
-      finalizeParams.setTable_db(db == null ? queryCtx.session.database : db);
-      FeFsTable hdfsTable = (FeFsTable) targetTable;
-      finalizeParams.setHdfs_base_dir(hdfsTable.getHdfsBaseDir());
       if (writeId != -1) {
         Preconditions.checkState(queryCtx.isSetTransaction_id());
         finalizeParams.setTransaction_id(queryCtx.getTransaction_id());
         finalizeParams.setWrite_id(writeId);
       } else if (targetTable instanceof FeIcebergTable) {
         FeIcebergTable iceTable = (FeIcebergTable)targetTable;
-        finalizeParams.setSpec_id(iceTable.getDefaultPartitionSpecId());
-        finalizeParams.setInitial_snapshot_id(iceTable.snapshotId());
+        TIcebergDmlFinalizeParams iceFinalizeParams = new 
TIcebergDmlFinalizeParams();
+        iceFinalizeParams.operation = TIcebergOperation.INSERT;
+        iceFinalizeParams.setSpec_id(iceTable.getDefaultPartitionSpecId());
+        iceFinalizeParams.setInitial_snapshot_id(iceTable.snapshotId());
+        finalizeParams.setIceberg_params(iceFinalizeParams);
       } else {
         // TODO: Currently this flag only controls the removal of the 
query-level staging
         // directory. HdfsTableSink (that creates the staging dir) calculates 
the path
         // independently. So it'd be better to either remove this option, or 
make it used
         // everywhere where the staging directory is referenced.
+        FeFsTable hdfsTable = (FeFsTable) targetTable;
         finalizeParams.setStaging_dir(
             hdfsTable.getHdfsBaseDir() + "/_impala_insert_staging");
       }
@@ -2574,6 +2597,21 @@ public class Frontend {
     }
   }
 
+  private static TFinalizeParams addFinalizationParamsForDml(TQueryCtx 
queryCtx,
+      FeTable targetTable, boolean isOverwrite) {
+    TFinalizeParams finalizeParams = new TFinalizeParams();
+    if (targetTable instanceof FeFsTable) {
+      finalizeParams.setIs_overwrite(isOverwrite);
+      finalizeParams.setTable_name(targetTable.getTableName().getTbl());
+      finalizeParams.setTable_id(DescriptorTable.TABLE_SINK_ID);
+      String db = targetTable.getTableName().getDb();
+      finalizeParams.setTable_db(db == null ? queryCtx.session.database : db);
+      FeFsTable hdfsTable = (FeFsTable) targetTable;
+      finalizeParams.setHdfs_base_dir(hdfsTable.getHdfsBaseDir());
+    }
+    return finalizeParams;
+  }
+
   /**
    * Add the metadata for the result set
    */
diff --git 
a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
index 03566774c..f419fc41b 100644
--- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
@@ -28,10 +28,12 @@ import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.DeleteFiles;
 import org.apache.iceberg.ExpireSnapshots;
+import org.apache.iceberg.FileMetadata;
 import org.apache.iceberg.ManageSnapshots;
 import org.apache.iceberg.Metrics;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.ReplacePartitions;
+import org.apache.iceberg.RowDelta;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SnapshotManager;
 import org.apache.iceberg.Table;
@@ -325,6 +327,44 @@ public class IcebergCatalogOpExecutor {
     }
   }
 
+  public static void execute(FeIcebergTable feIcebergTable, Transaction txn,
+      TIcebergOperationParam icebergOp) throws ImpalaRuntimeException {
+    switch (icebergOp.operation) {
+      case INSERT: appendFiles(feIcebergTable, txn, icebergOp); break;
+      case DELETE: deleteRows(feIcebergTable, txn, icebergOp); break;
+      default: throw new ImpalaRuntimeException(
+          "Unknown Iceberg operation: " + icebergOp.operation);
+    }
+  }
+
+  public static void deleteRows(FeIcebergTable feIcebergTable, Transaction txn,
+      TIcebergOperationParam icebergOp) throws ImpalaRuntimeException {
+    org.apache.iceberg.Table nativeIcebergTable = 
feIcebergTable.getIcebergApiTable();
+    List<ByteBuffer> deleteFilesFb = icebergOp.getIceberg_delete_files_fb();
+    RowDelta rowDelta = txn.newRowDelta();
+    for (ByteBuffer buf : deleteFilesFb) {
+      FbIcebergDataFile deleteFile = 
FbIcebergDataFile.getRootAsFbIcebergDataFile(buf);
+
+      PartitionSpec partSpec = 
nativeIcebergTable.specs().get(icebergOp.getSpec_id());
+      Metrics metrics = buildDataFileMetrics(deleteFile);
+      FileMetadata.Builder builder = FileMetadata.deleteFileBuilder(partSpec)
+          .ofPositionDeletes()
+          .withMetrics(metrics)
+          .withPath(deleteFile.path())
+          
.withFormat(IcebergUtil.fbFileFormatToIcebergFileFormat(deleteFile.format()))
+          .withRecordCount(deleteFile.recordCount())
+          .withFileSizeInBytes(deleteFile.fileSizeInBytes());
+      rowDelta.addDeletes(builder.build());
+    }
+    try {
+      rowDelta.validateFromSnapshot(icebergOp.getInitial_snapshot_id());
+      rowDelta.validateNoConflictingDataFiles();
+      rowDelta.commit();
+    } catch (ValidationException e) {
+      throw new ImpalaRuntimeException(e.getMessage(), e);
+    }
+  }
+
   /**
    * Append the newly inserted data files to the Iceberg table using the 
AppendFiles
    * API.
diff --git 
a/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java 
b/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java
index f44caccf5..aefba0c11 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java
@@ -119,7 +119,8 @@ public class AnalyzeModifyStmtsTest extends AnalyzerTest {
     AnalyzesOk("update functional_kudu.dimtbl set name = substr('hallo', 3)");
     // Only Kudu tables can be updated
     AnalysisError("update functional.alltypes set intcol = 99",
-        "Impala does not support modifying a non-Kudu table: 
functional.alltypes");
+        "Impala only supports modifying Kudu and Iceberg tables, but the 
following "+
+        "table is neither: functional.alltypes");
     // Non existing column in update
     AnalysisError("update functional_kudu.dimtbl set links='10'",
         "Could not resolve column/field reference: 'links'");
@@ -201,10 +202,12 @@ public class AnalyzeModifyStmtsTest extends AnalyzerTest {
     AnalysisError(
         "update a set b.name =" +
             " 'Oskar' FROM functional.testtbl a join functional_kudu.testtbl 
b",
-        "Impala does not support modifying a non-Kudu table: 
functional.testtbl");
+        "Impala only supports modifying Kudu and Iceberg tables, but the 
following " +
+        "table is neither: functional.testtbl");
     AnalysisError(
         "delete a FROM functional.testtbl a join functional_kudu.testtbl b",
-        "Impala does not support modifying a non-Kudu table: 
functional.testtbl");
+        "Impala only supports modifying Kudu and Iceberg tables, but the " +
+        "following table is neither: functional.testtbl");
   }
 
   @Test
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java 
b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index b032ebbce..2a274fb2b 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -1282,6 +1282,16 @@ public class PlannerTest extends PlannerTestBase {
         ImmutableSet.of(PlannerTestOption.VALIDATE_CARDINALITY));
   }
 
+
+  /**
+   * Check that Iceberg V2 DELETE statements work as expected.
+   */
+  @Test
+  public void testIcebergV2Delete() {
+    runPlannerTestFile("iceberg-v2-delete", "functional_parquet",
+        ImmutableSet.of(PlannerTestOption.VALIDATE_CARDINALITY));
+  }
+
   /**
    * Check that Iceberg metadata table scan plans are as expected.
    */
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-delete.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-delete.test
new file mode 100644
index 000000000..fa13b70cc
--- /dev/null
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-delete.test
@@ -0,0 +1,195 @@
+DELETE FROM iceberg_v2_no_deletes where i = 3
+---- PLAN
+DELETE FROM ICEBERG [functional_parquet.iceberg_v2_no_deletes-POSITION-DELETE]
+|
+01:SORT
+|  order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
+|  row-size=20B cardinality=1
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_no_deletes]
+   HDFS partitions=1/1 files=1 size=625B
+   predicates: i = 3
+   row-size=24B cardinality=1
+---- DISTRIBUTEDPLAN
+DELETE FROM ICEBERG [functional_parquet.iceberg_v2_no_deletes-POSITION-DELETE]
+|
+01:SORT
+|  order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
+|  row-size=20B cardinality=1
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_no_deletes]
+   HDFS partitions=1/1 files=1 size=625B
+   predicates: i = 3
+   row-size=24B cardinality=1
+====
+DELETE FROM iceberg_v2_delete_positional where id = 15
+---- PLAN
+DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE]
+|
+03:SORT
+|  order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
+|  row-size=20B cardinality=1
+|
+02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+|  row-size=28B cardinality=1
+|
+|--01:SCAN HDFS 
[functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-01 
functional_parquet.iceberg_v2_delete_positional-position-delete]
+|     HDFS partitions=1/1 files=1 size=1.54KB
+|     row-size=182B cardinality=1
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional]
+   HDFS partitions=1/1 files=1 size=662B
+   predicates: id = 15
+   row-size=28B cardinality=1
+---- DISTRIBUTEDPLAN
+DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE]
+|
+04:SORT
+|  order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
+|  row-size=20B cardinality=1
+|
+02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
+|  row-size=28B cardinality=1
+|
+|--03:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS 
[functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-01 
functional_parquet.iceberg_v2_delete_positional-position-delete]
+|     HDFS partitions=1/1 files=1 size=1.54KB
+|     row-size=182B cardinality=1
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional]
+   HDFS partitions=1/1 files=1 size=662B
+   predicates: id = 15
+   row-size=28B cardinality=1
+====
+DELETE FROM iceberg_v2_delete_positional
+where id = (select min(id) from iceberg_v2_delete_positional)
+---- PLAN
+DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE]
+|
+08:SORT
+|  order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
+|  row-size=20B cardinality=3
+|
+07:HASH JOIN [LEFT SEMI JOIN]
+|  hash predicates: id = min(id)
+|  runtime filters: RF000 <- min(id)
+|  row-size=28B cardinality=3
+|
+|--06:AGGREGATE [FINALIZE]
+|  |  output: min(id)
+|  |  row-size=8B cardinality=1
+|  |
+|  05:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+|  |  row-size=28B cardinality=3
+|  |
+|  |--04:SCAN HDFS 
[functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-04 
functional_parquet.iceberg_v2_delete_positional-position-delete]
+|  |     HDFS partitions=1/1 files=1 size=1.54KB
+|  |     row-size=182B cardinality=1
+|  |
+|  03:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional]
+|     HDFS partitions=1/1 files=1 size=662B
+|     row-size=28B cardinality=3
+|
+02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+|  row-size=28B cardinality=3
+|
+|--01:SCAN HDFS 
[functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-01 
functional_parquet.iceberg_v2_delete_positional-position-delete]
+|     HDFS partitions=1/1 files=1 size=1.54KB
+|     row-size=182B cardinality=1
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional]
+   HDFS partitions=1/1 files=1 size=662B
+   runtime filters: RF000 -> id
+   row-size=28B cardinality=3
+---- DISTRIBUTEDPLAN
+DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE]
+|
+13:SORT
+|  order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
+|  row-size=20B cardinality=3
+|
+07:HASH JOIN [LEFT SEMI JOIN, BROADCAST]
+|  hash predicates: id = min(id)
+|  runtime filters: RF000 <- min(id)
+|  row-size=28B cardinality=3
+|
+|--12:EXCHANGE [BROADCAST]
+|  |
+|  11:AGGREGATE [FINALIZE]
+|  |  output: min:merge(id)
+|  |  row-size=8B cardinality=1
+|  |
+|  10:EXCHANGE [UNPARTITIONED]
+|  |
+|  06:AGGREGATE
+|  |  output: min(id)
+|  |  row-size=8B cardinality=1
+|  |
+|  05:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
+|  |  row-size=28B cardinality=3
+|  |
+|  |--09:EXCHANGE [BROADCAST]
+|  |  |
+|  |  04:SCAN HDFS 
[functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-04 
functional_parquet.iceberg_v2_delete_positional-position-delete]
+|  |     HDFS partitions=1/1 files=1 size=1.54KB
+|  |     row-size=182B cardinality=1
+|  |
+|  03:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional]
+|     HDFS partitions=1/1 files=1 size=662B
+|     row-size=28B cardinality=3
+|
+02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
+|  row-size=28B cardinality=3
+|
+|--08:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS 
[functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-01 
functional_parquet.iceberg_v2_delete_positional-position-delete]
+|     HDFS partitions=1/1 files=1 size=1.54KB
+|     row-size=182B cardinality=1
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional]
+   HDFS partitions=1/1 files=1 size=662B
+   runtime filters: RF000 -> id
+   row-size=28B cardinality=3
+====
+DELETE FROM iceberg_v2_delete_positional WHERE FILE__POSITION = id
+---- PLAN
+DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE]
+|
+03:SORT
+|  order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
+|  row-size=20B cardinality=1
+|
+02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+|  row-size=28B cardinality=1
+|
+|--01:SCAN HDFS 
[functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-01 
functional_parquet.iceberg_v2_delete_positional-position-delete]
+|     HDFS partitions=1/1 files=1 size=1.54KB
+|     row-size=182B cardinality=1
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional]
+   HDFS partitions=1/1 files=1 size=662B
+   predicates: FILE__POSITION = id
+   row-size=28B cardinality=1
+---- DISTRIBUTEDPLAN
+DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE]
+|
+04:SORT
+|  order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
+|  row-size=20B cardinality=1
+|
+02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
+|  row-size=28B cardinality=1
+|
+|--03:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS 
[functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-01 
functional_parquet.iceberg_v2_delete_positional-position-delete]
+|     HDFS partitions=1/1 files=1 size=1.54KB
+|     row-size=182B cardinality=1
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional]
+   HDFS partitions=1/1 files=1 size=662B
+   predicates: FILE__POSITION = id
+   row-size=28B cardinality=1
+====
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-delete-complex.test
 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-delete-complex.test
new file mode 100644
index 000000000..d8cf7ac0a
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-delete-complex.test
@@ -0,0 +1,106 @@
+====
+---- QUERY
+SELECT * FROM ice_complex_delete;
+---- RESULTS
+1,'[1,2,3]','[[1,2],[3,4]]','{"k1":1,"k2":100}','[{"k1":1}]','{"a":1,"b":[1],"c":{"d":[[{"e":10,"f":"aaa"},{"e":-10,"f":"bbb"}],[{"e":11,"f":"c"}]]},"g":{"foo":{"h":{"i":[1.1]}}}}'
+1,'[1,2,3]','[[1,2],[3,4]]','{"k1":1,"k2":100}','[{"k1":1}]','{"a":1,"b":[1],"c":{"d":[[{"e":10,"f":"aaa"},{"e":-10,"f":"bbb"}],[{"e":11,"f":"c"}]]},"g":{"foo":{"h":{"i":[1.1]}}}}'
+2,'[null,1,2,null,3,null]','[[null,1,2,null],[3,null,4],[],null]','{"k1":2,"k2":null}','[{"k1":1,"k3":null},null,{}]','{"a":null,"b":[null],"c":{"d":[[{"e":null,"f":null},{"e":10,"f":"aaa"},{"e":null,"f":null},{"e":-10,"f":"bbb"},{"e":null,"f":null}],[{"e":11,"f":"c"},null],[],null]},"g":{"g1":{"h":{"i":[2.2,null]}},"g2":{"h":{"i":[]}},"g3":null,"g4":{"h":{"i":null}},"g5":{"h":null}}}'
+2,'[null,1,2,null,3,null]','[[null,1,2,null],[3,null,4],[],null]','{"k1":2,"k2":null}','[{"k1":1,"k3":null},null,{}]','{"a":null,"b":[null],"c":{"d":[[{"e":null,"f":null},{"e":10,"f":"aaa"},{"e":null,"f":null},{"e":-10,"f":"bbb"},{"e":null,"f":null}],[{"e":11,"f":"c"},null],[],null]},"g":{"g1":{"h":{"i":[2.2,null]}},"g2":{"h":{"i":[]}},"g3":null,"g4":{"h":{"i":null}},"g5":{"h":null}}}'
+3,'[]','[null]','{}','[null,null]','{"a":null,"b":null,"c":{"d":[]},"g":{}}'
+3,'[]','[null]','{}','[null,null]','{"a":null,"b":null,"c":{"d":[]},"g":{}}'
+4,'NULL','[]','{}','[]','{"a":null,"b":null,"c":{"d":null},"g":null}'
+4,'NULL','[]','{}','[]','{"a":null,"b":null,"c":{"d":null},"g":null}'
+5,'NULL','NULL','{}','NULL','{"a":null,"b":null,"c":null,"g":{"foo":{"h":{"i":[2.2,3.3]}}}}'
+5,'NULL','NULL','{}','NULL','{"a":null,"b":null,"c":null,"g":{"foo":{"h":{"i":[2.2,3.3]}}}}'
+6,'NULL','NULL','NULL','NULL','NULL'
+6,'NULL','NULL','NULL','NULL','NULL'
+7,'NULL','[null,[5,6]]','{"k1":null,"k3":null}','NULL','{"a":7,"b":[2,3,null],"c":{"d":[[],[null],null]},"g":null}'
+7,'NULL','[null,[5,6]]','{"k1":null,"k3":null}','NULL','{"a":7,"b":[2,3,null],"c":{"d":[[],[null],null]},"g":null}'
+8,'[-1]','[[-1,-2],[]]','{"k1":-1}','[{},{"k1":1},{},{}]','{"a":-1,"b":[-1],"c":{"d":[[{"e":-1,"f":"nonnullable"}]]},"g":{}}'
+8,'[-1]','[[-1,-2],[]]','{"k1":-1}','[{},{"k1":1},{},{}]','{"a":-1,"b":[-1],"c":{"d":[[{"e":-1,"f":"nonnullable"}]]},"g":{}}'
+---- TYPES
+BIGINT, STRING, STRING, STRING, STRING, STRING
+====
+---- QUERY
+SELECT id, INPUT__FILE__NAME FROM ice_complex_delete WHERE id % 2 = 0 AND 
INPUT__FILE__NAME LIKE '%orc';
+---- RESULTS
+2,regex:'$NAMENODE/.*.orc'
+4,regex:'$NAMENODE/.*.orc'
+6,regex:'$NAMENODE/.*.orc'
+8,regex:'$NAMENODE/.*.orc'
+---- TYPES
+BIGINT, STRING
+====
+---- QUERY
+DELETE FROM ice_complex_delete WHERE id % 2 = 0 AND INPUT__FILE__NAME LIKE 
'%orc';
+SELECT *, INPUT__FILE__NAME FROM ice_complex_delete;
+---- RESULTS
+1,'[1,2,3]','[[1,2],[3,4]]','{"k1":1,"k2":100}','[{"k1":1}]','{"a":1,"b":[1],"c":{"d":[[{"e":10,"f":"aaa"},{"e":-10,"f":"bbb"}],[{"e":11,"f":"c"}]]},"g":{"foo":{"h":{"i":[1.1]}}}}',regex:'$NAMENODE/.*.parquet'
+1,'[1,2,3]','[[1,2],[3,4]]','{"k1":1,"k2":100}','[{"k1":1}]','{"a":1,"b":[1],"c":{"d":[[{"e":10,"f":"aaa"},{"e":-10,"f":"bbb"}],[{"e":11,"f":"c"}]]},"g":{"foo":{"h":{"i":[1.1]}}}}',regex:'$NAMENODE/.*.orc'
+2,'[null,1,2,null,3,null]','[[null,1,2,null],[3,null,4],[],null]','{"k1":2,"k2":null}','[{"k1":1,"k3":null},null,{}]','{"a":null,"b":[null],"c":{"d":[[{"e":null,"f":null},{"e":10,"f":"aaa"},{"e":null,"f":null},{"e":-10,"f":"bbb"},{"e":null,"f":null}],[{"e":11,"f":"c"},null],[],null]},"g":{"g1":{"h":{"i":[2.2,null]}},"g2":{"h":{"i":[]}},"g3":null,"g4":{"h":{"i":null}},"g5":{"h":null}}}',regex:'$NAMENODE/.*.parquet'
+3,'[]','[null]','{}','[null,null]','{"a":null,"b":null,"c":{"d":[]},"g":{}}',regex:'$NAMENODE/.*.parquet'
+3,'[]','[null]','{}','[null,null]','{"a":null,"b":null,"c":{"d":[]},"g":{}}',regex:'$NAMENODE/.*.orc'
+4,'NULL','[]','{}','[]','{"a":null,"b":null,"c":{"d":null},"g":null}',regex:'$NAMENODE/.*.parquet'
+5,'NULL','NULL','{}','NULL','{"a":null,"b":null,"c":null,"g":{"foo":{"h":{"i":[2.2,3.3]}}}}',regex:'$NAMENODE/.*.parquet'
+5,'NULL','NULL','{}','NULL','{"a":null,"b":null,"c":null,"g":{"foo":{"h":{"i":[2.2,3.3]}}}}',regex:'$NAMENODE/.*.orc'
+6,'NULL','NULL','NULL','NULL','NULL',regex:'$NAMENODE/.*.parquet'
+7,'NULL','[null,[5,6]]','{"k1":null,"k3":null}','NULL','{"a":7,"b":[2,3,null],"c":{"d":[[],[null],null]},"g":null}',regex:'$NAMENODE/.*.parquet'
+7,'NULL','[null,[5,6]]','{"k1":null,"k3":null}','NULL','{"a":7,"b":[2,3,null],"c":{"d":[[],[null],null]},"g":null}',regex:'$NAMENODE/.*.orc'
+8,'[-1]','[[-1,-2],[]]','{"k1":-1}','[{},{"k1":1},{},{}]','{"a":-1,"b":[-1],"c":{"d":[[{"e":-1,"f":"nonnullable"}]]},"g":{}}',regex:'$NAMENODE/.*.parquet'
+---- TYPES
+BIGINT, STRING, STRING, STRING, STRING, STRING, STRING
+====
+---- QUERY
+SELECT id, INPUT__FILE__NAME FROM ice_complex_delete WHERE id % 2 = 1 AND 
INPUT__FILE__NAME LIKE '%parquet';
+---- RESULTS
+1,regex:'$NAMENODE/.*.parquet'
+3,regex:'$NAMENODE/.*.parquet'
+5,regex:'$NAMENODE/.*.parquet'
+7,regex:'$NAMENODE/.*.parquet'
+---- TYPES
+BIGINT, STRING
+====
+---- QUERY
+DELETE FROM ice_complex_delete WHERE id % 2 = 1 AND INPUT__FILE__NAME LIKE 
'%parquet';
+SELECT *, INPUT__FILE__NAME FROM ice_complex_delete;
+---- RESULTS
+1,'[1,2,3]','[[1,2],[3,4]]','{"k1":1,"k2":100}','[{"k1":1}]','{"a":1,"b":[1],"c":{"d":[[{"e":10,"f":"aaa"},{"e":-10,"f":"bbb"}],[{"e":11,"f":"c"}]]},"g":{"foo":{"h":{"i":[1.1]}}}}',regex:'$NAMENODE/.*.orc'
+2,'[null,1,2,null,3,null]','[[null,1,2,null],[3,null,4],[],null]','{"k1":2,"k2":null}','[{"k1":1,"k3":null},null,{}]','{"a":null,"b":[null],"c":{"d":[[{"e":null,"f":null},{"e":10,"f":"aaa"},{"e":null,"f":null},{"e":-10,"f":"bbb"},{"e":null,"f":null}],[{"e":11,"f":"c"},null],[],null]},"g":{"g1":{"h":{"i":[2.2,null]}},"g2":{"h":{"i":[]}},"g3":null,"g4":{"h":{"i":null}},"g5":{"h":null}}}',regex:'$NAMENODE/.*.parquet'
+3,'[]','[null]','{}','[null,null]','{"a":null,"b":null,"c":{"d":[]},"g":{}}',regex:'$NAMENODE/.*.orc'
+4,'NULL','[]','{}','[]','{"a":null,"b":null,"c":{"d":null},"g":null}',regex:'$NAMENODE/.*.parquet'
+5,'NULL','NULL','{}','NULL','{"a":null,"b":null,"c":null,"g":{"foo":{"h":{"i":[2.2,3.3]}}}}',regex:'$NAMENODE/.*.orc'
+6,'NULL','NULL','NULL','NULL','NULL',regex:'$NAMENODE/.*.parquet'
+7,'NULL','[null,[5,6]]','{"k1":null,"k3":null}','NULL','{"a":7,"b":[2,3,null],"c":{"d":[[],[null],null]},"g":null}',regex:'$NAMENODE/.*.orc'
+8,'[-1]','[[-1,-2],[]]','{"k1":-1}','[{},{"k1":1},{},{}]','{"a":-1,"b":[-1],"c":{"d":[[{"e":-1,"f":"nonnullable"}]]},"g":{}}',regex:'$NAMENODE/.*.parquet'
+---- TYPES
+BIGINT, STRING, STRING, STRING, STRING, STRING, STRING
+====
+---- QUERY
+DELETE FROM ice_complex_delete WHERE id < 4;
+SELECT *, INPUT__FILE__NAME FROM ice_complex_delete;
+---- RESULTS
+4,'NULL','[]','{}','[]','{"a":null,"b":null,"c":{"d":null},"g":null}',regex:'$NAMENODE/.*.parquet'
+5,'NULL','NULL','{}','NULL','{"a":null,"b":null,"c":null,"g":{"foo":{"h":{"i":[2.2,3.3]}}}}',regex:'$NAMENODE/.*.orc'
+6,'NULL','NULL','NULL','NULL','NULL',regex:'$NAMENODE/.*.parquet'
+7,'NULL','[null,[5,6]]','{"k1":null,"k3":null}','NULL','{"a":7,"b":[2,3,null],"c":{"d":[[],[null],null]},"g":null}',regex:'$NAMENODE/.*.orc'
+8,'[-1]','[[-1,-2],[]]','{"k1":-1}','[{},{"k1":1},{},{}]','{"a":-1,"b":[-1],"c":{"d":[[{"e":-1,"f":"nonnullable"}]]},"g":{}}',regex:'$NAMENODE/.*.parquet'
+---- TYPES
+BIGINT, STRING, STRING, STRING, STRING, STRING, STRING
+====
+---- QUERY
+select id, unnest(int_array), input__file__name from ice_complex_delete;
+---- RESULTS
+8,-1,regex:'.*.parquet'
+---- TYPES
+BIGINT, INT, STRING
+====
+---- QUERY
+select id, unnest(int_array_array), input__file__name from ice_complex_delete;
+---- RESULTS
+7,'NULL',regex:'.*.orc'
+7,'[5,6]',regex:'.*.orc'
+8,'[-1,-2]',regex:'.*.parquet'
+8,'[]',regex:'.*.parquet'
+---- TYPES
+BIGINT, STRING, STRING
+====
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-delete.test 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-delete.test
new file mode 100644
index 000000000..45c5105ca
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-delete.test
@@ -0,0 +1,117 @@
+====
+---- QUERY
+CREATE TABLE ice_delete (i int, s string)
+STORED BY ICEBERG
+TBLPROPERTIES ('format-version'='2');
+====
+---- QUERY
+# Delete from empty table is no-op.
+DELETE FROM ice_delete where i = 1;
+SELECT * FROM ice_delete;
+---- RESULTS
+---- TYPES
+INT,STRING
+====
+---- QUERY
+INSERT INTO ice_delete VALUES(1, 'one'), (2, 'two'), (3, 'three');
+DELETE FROM ice_delete WHERE i = 2;
+SELECT * FROM ice_delete;
+---- RESULTS
+1,'one'
+3,'three'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+SELECT count(*) FROM ice_delete;
+---- RESULTS
+2
+---- TYPES
+BIGINT
+====
+---- QUERY
+INSERT INTO ice_delete VALUES (4, 'four'), (5, 'five'), (6, 'six');
+SELECT * FROM ice_delete;
+---- RESULTS
+1,'one'
+3,'three'
+4,'four'
+5,'five'
+6,'six'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+DELETE FROM ice_delete WHERE s like 'f%' and i > 4;
+SELECT * FROM ice_delete;
+---- RESULTS
+1,'one'
+3,'three'
+4,'four'
+6,'six'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+INSERT INTO ice_delete VALUES (7, 'seven'), (8, 'eight');
+DELETE FROM ice_delete WHERE i in (SELECT i FROM ice_delete where s in ('one', 
'three'));
+SELECT * FROM ice_delete;
+---- RESULTS
+4,'four'
+6,'six'
+7,'seven'
+8,'eight'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+DELETE FROM ice_delete WHERE FILE__POSITION = 0;
+SELECT * FROM ice_delete;
+---- RESULTS
+6,'six'
+8,'eight'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+INSERT INTO ice_delete VALUES (9, 'nine'), (10, 'ten');
+DELETE FROM ice_delete WHERE s = (SELECT min(s) FROM ice_delete);
+SELECT * FROM ice_delete;
+---- RESULTS
+6,'six'
+9,'nine'
+10,'ten'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+DELETE FROM ice_delete WHERE i < 10;
+SELECT * FROM ice_delete;
+---- RESULTS
+10,'ten'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+DELETE FROM ice_delete WHERE i = 1000;
+SELECT * FROM ice_delete;
+---- RESULTS
+10,'ten'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+CREATE TABLE ice_lineitem STORED BY ICEBERG
+TBLPROPERTIES ('format-version'='2')
+AS SELECT * FROM tpch_parquet.lineitem;
+DELETE FROM ice_lineitem WHERE l_orderkey % 5 = 1;
+SELECT count(*) FROM ice_lineitem;
+---- RESULTS
+4799418
+---- TYPES
+BIGINT
+====
+---- QUERY
+SELECT * FROM ice_lineitem WHERE l_orderkey % 5 = 1;
+---- RESULTS
+====
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
index 8cebc653a..6be8c01ff 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
@@ -683,3 +683,55 @@ select * from 
functional_parquet.iceberg_v2_delete_equality for system_time as o
 ---- CATCH
 ImpalaRuntimeException: Iceberg table 
functional_parquet.iceberg_v2_delete_equality has EQUALITY delete file which is 
currently not supported by Impala
 ====
+---- QUERY
+create table ice_delete (i int, s string)
+stored by iceberg
+tblproperties ('format-version'='1');
+====
+---- QUERY
+# Cannot DELETE from Iceberg V1 table
+delete from ice_delete where i = 1;
+---- CATCH
+AnalysisException: Iceberg V1 table do not support DELETE/UPDATE operations: 
$DATABASE.ice_delete
+====
+---- QUERY
+# For deleting every row, users must use TRUNCATE.
+alter table ice_delete set tblproperties ('format-version'='2');
+delete from ice_delete;
+---- CATCH
+AnalysisException: For deleting every row, please use TRUNCATE.
+====
+---- QUERY
+delete from ice_delete where true;
+---- CATCH
+AnalysisException: For deleting every row, please use TRUNCATE.
+====
+---- QUERY
+# Cannot delete from Iceberg table if the write format is not Parquet (and 
there is no delete format)
+alter table ice_delete set tblproperties ('write.format.default'='ORC');
+delete from ice_delete where i = 1;
+---- CATCH
+AnalysisException: Impala can only write delete files in PARQUET, but the 
given table uses a different file format: $DATABASE.ice_delete
+====
+---- QUERY
+# Cannot delete from Iceberg table is delete format is not Parquet
+alter table ice_delete set tblproperties ('write.format.default'='PARQUET', 
'write.delete.format.default'='ORC');
+delete from ice_delete where i = 1;
+---- CATCH
+AnalysisException: Impala can only write delete files in PARQUET, but the 
given table uses a different file format: $DATABASE.ice_delete
+====
+---- QUERY
+# Cannot delete from Iceberg table is write mode is not 'merge-on-read'
+alter table ice_delete set tblproperties 
('write.delete.format.default'='PARQUET', 'write.delete.mode'='copy-on-write');
+delete from ice_delete where i = 1;
+---- CATCH
+AnalysisException: Unsupported delete mode: 'copy-on-write' for Iceberg table: 
$DATABASE.ice_delete
+====
+---- QUERY
+# Cannot delete from partitioned Iceberg table.
+alter table ice_delete set tblproperties ('write.delete.mode'='merge-on-read');
+alter table ice_delete set partition spec (bucket(5, i));
+delete from ice_delete;
+---- CATCH
+AnalysisException: Cannot execute DELETE/UPDATE statement on partitioned 
Iceberg table: $DATABASE.ice_delete
+====
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 028c6859b..e8c2ed522 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -1138,6 +1138,63 @@ class TestIcebergV2Table(IcebergTestSuite):
     self.run_test_case('QueryTest/iceberg-tablesample-v2', vector,
         use_db="functional_parquet")
 
-  def test_metadata_tables(self, vector, unique_database):
+  def test_metadata_tables(self, vector):
     self.run_test_case('QueryTest/iceberg-metadata-tables', vector,
         use_db="functional_parquet")
+
+  def test_delete(self, vector, unique_database):
+    self.run_test_case('QueryTest/iceberg-delete', vector,
+        unique_database)
+
+  @SkipIfFS.hive
+  def test_delete_hive_read(self, vector, unique_database):
+    ice_delete = unique_database + ".ice_delete"
+    self.execute_query("""CREATE TABLE {} (i int, s string)
+        STORED BY ICEBERG
+        TBLPROPERTIES('format-version'='2')""".format(ice_delete))
+    self.execute_query("INSERT INTO {} VALUES (1, 'one')".format(ice_delete))
+    self.execute_query("INSERT INTO {} VALUES (2, 'two')".format(ice_delete))
+    self.execute_query("INSERT INTO {} VALUES (3, 'three')".format(ice_delete))
+    self.execute_query("DELETE FROM {} WHERE i = 2".format(ice_delete))
+
+    # Hive needs table property 'format-version' explicitly set
+    self.run_stmt_in_hive("ALTER TABLE {} SET 
TBLPROPERTIES('format-version'='2')".format(
+        ice_delete))
+    hive_output = self.run_stmt_in_hive("SELECT * FROM {} ORDER BY 
i".format(ice_delete))
+    expected_output = "ice_delete.i,ice_delete.s\n1,one\n3,three\n"
+    assert hive_output == expected_output
+
+    ice_lineitem = unique_database + ".linteitem_ice"
+    self.execute_query("""CREATE TABLE {}
+        STORED BY ICEBERG
+        TBLPROPERTIES('format-version'='2')
+        AS SELECT * FROM tpch_parquet.lineitem""".format(ice_lineitem))
+    self.execute_query("DELETE FROM {} WHERE l_orderkey % 5 = 
0".format(ice_lineitem))
+    impala_result = self.execute_query("SELECT count(*) FROM 
{}".format(ice_lineitem))
+    assert impala_result.data[0] == "4799964"
+    # Hive needs table property 'format-version' explicitly set
+    self.run_stmt_in_hive("ALTER TABLE {} SET 
TBLPROPERTIES('format-version'='2')".format(
+        ice_lineitem))
+    hive_output = self.run_stmt_in_hive("SELECT count(*) FROM 
{}".format(ice_lineitem))
+    assert hive_output == "_c0\n4799964\n"
+
+  @SkipIfFS.hive
+  def test_delete_complextypes_mixed_files(self, vector, unique_database):
+    ice_t = unique_database + ".ice_complex_delete"
+    self.run_stmt_in_hive("""create table {}
+        stored by iceberg stored as orc as
+        select * from functional_parquet.complextypestbl;""".format(ice_t))
+    # Hive needs table property 'format-version' explicitly set
+    self.run_stmt_in_hive("ALTER TABLE {} SET 
TBLPROPERTIES('format-version'='2')".format(
+        ice_t))
+    self.run_stmt_in_hive("""alter table {}
+        set tblproperties ('write.format.default'='parquet')""".format(ice_t))
+    self.run_stmt_in_hive("""insert into {}
+      select * from functional_parquet.complextypestbl""".format(ice_t))
+
+    vector.get_value('exec_option')['expand_complex_types'] = True
+    self.run_test_case('QueryTest/iceberg-delete-complex', vector,
+        unique_database)
+    hive_output = self.run_stmt_in_hive("SELECT id FROM {} ORDER BY 
id".format(ice_t))
+    # Test that Hive sees the same rows deleted.
+    assert hive_output == "id\n4\n5\n6\n7\n8\n"

Reply via email to