This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 7aa4d50484e5508ac2253f4b7e08bdccbaa43d54
Author: Joe McDonnell <[email protected]>
AuthorDate: Fri Mar 21 12:33:20 2025 -0700

    IMPALA-13882: Fix Iceberg v2 deletes with tuple caching
    
    A variety of Iceberg statements (including v2 deletes) rely
    on getting information from the scan node child of the
    delete node. Since tuple caching can insert a TupleCacheNode
    above that scan, the logic is currently failing, because
    it doesn't know how to bypass the TupleCacheNode and get
    to the scan node below.
    
    This modifies the logic in multiple places to detect a
    TupleCacheNode and go past it to the get the scan node
    below it.
    
    Testing:
     - Added a basic Iceberg test with v2 deletes for the
      frontend test and custom cluster test
    
    Change-Id: I162e738c4e4449a536701a740272aaac56ce8fd8
    Reviewed-on: http://gerrit.cloudera.org:8080/22666
    Reviewed-by: Kurt Deschler <[email protected]>
    Reviewed-by: Michael Smith <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/exec/exec-node.h                              | 11 +++++++++++
 be/src/exec/iceberg-delete-builder.cc                |  8 +++++++-
 be/src/runtime/fragment-state.cc                     | 12 ++++++++----
 .../org/apache/impala/planner/IcebergDeleteNode.java |  6 +++++-
 .../org/apache/impala/planner/TupleCacheTest.java    |  9 +++++++++
 tests/custom_cluster/test_tuple_cache.py             | 20 ++++++++++++++++++++
 6 files changed, 60 insertions(+), 6 deletions(-)

diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index 40d77a327..5f1f2c577 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -119,6 +119,17 @@ class PlanNode {
 
   std::vector<PlanNode*> children_;
 
+  /// Helper function to skip past a TupleCacheNode if present. If the 
provided PlanNode
+  /// is a TupleCacheNode, it returns the child. Otherwise, it returns the 
provided
+  /// PlanNode.
+  static PlanNode* LookPastTupleCache(PlanNode* pnode) {
+    if (pnode->tnode_->node_type == TPlanNodeType::TUPLE_CACHE_NODE) {
+      pnode = pnode->children_[0];
+    }
+    DCHECK(pnode != nullptr);
+    return pnode;
+  }
+
   /// Pointer to the containing SubplanPlanNode or NULL if not inside a 
subplan.
   /// Set by the containing SubplanPlanNode::Init() before Init() is called on
   /// 'this' node. Not owned.
diff --git a/be/src/exec/iceberg-delete-builder.cc 
b/be/src/exec/iceberg-delete-builder.cc
index 510d350fc..d23600c6d 100644
--- a/be/src/exec/iceberg-delete-builder.cc
+++ b/be/src/exec/iceberg-delete-builder.cc
@@ -127,7 +127,13 @@ Status IcebergDeleteBuilder::CalculateDataFiles() {
       q.pop();
       if (current->tnode_->node_id == join_node_id_) {
         fragment_it = it;
-        delete_scan_node = current->children_[0];
+        // Tuple caching can place a TupleCacheNode above the scan node. Look 
past
+        // a TupleCacheNode to get to the scan node.
+        delete_scan_node = PlanNode::LookPastTupleCache(current->children_[0]);
+        DCHECK_EQ(delete_scan_node->tnode_->node_type, 
TPlanNodeType::HDFS_SCAN_NODE)
+          << "Failed to calculate delete files: "
+          << Substitute("Unexpected type for plan node $0: $1",
+                 delete_scan_node->tnode_->node_id, 
delete_scan_node->tnode_->node_type);
         found = true;
         while (!q.empty()) q.pop();
         break;
diff --git a/be/src/runtime/fragment-state.cc b/be/src/runtime/fragment-state.cc
index e16e72d9e..f2bb58600 100644
--- a/be/src/runtime/fragment-state.cc
+++ b/be/src/runtime/fragment-state.cc
@@ -102,18 +102,22 @@ Status 
FragmentState::PutFilesToHostsMappingToSinkConfig() {
   DCHECK_EQ(fragment_.output_sink.stream_sink.output_partition.type,
       TPartitionType::DIRECTED);
 
+  // Tuple caching can place a TupleCacheNode above the scan, so look past a
+  // TupleCacheNode to reach the original scan
+  PlanNode* pnode = PlanNode::LookPastTupleCache(plan_tree_);
+
   const QueryState::NodeToFileSchedulings* node_to_file_schedulings =
       query_state_->node_to_file_schedulings();
   QueryState::NodeToFileSchedulings::const_iterator 
node_to_file_schedulings_it =
-      node_to_file_schedulings->find(plan_tree_->tnode_->node_id);
+      node_to_file_schedulings->find(pnode->tnode_->node_id);
   if (node_to_file_schedulings_it == node_to_file_schedulings->end()) {
     return Status(Substitute("Failed to find file to hosts mapping for plan 
node: "
-        "$0", plan_tree_->tnode_->node_id));
+        "$0", pnode->tnode_->node_id));
   }
 
-  DCHECK_EQ(plan_tree_->tnode_->node_type, TPlanNodeType::HDFS_SCAN_NODE);
+  DCHECK_EQ(pnode->tnode_->node_type, TPlanNodeType::HDFS_SCAN_NODE);
   TupleDescriptor* tuple_desc =
-      
desc_tbl().GetTupleDescriptor(plan_tree_->tnode_->hdfs_scan_node.tuple_id);
+      desc_tbl().GetTupleDescriptor(pnode->tnode_->hdfs_scan_node.tuple_id);
   DCHECK(tuple_desc != nullptr);
   DCHECK(tuple_desc->table_desc() != nullptr);
   const HdfsTableDescriptor* hdfs_table_desc =
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergDeleteNode.java 
b/fe/src/main/java/org/apache/impala/planner/IcebergDeleteNode.java
index 3447b218c..591a1c9be 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergDeleteNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergDeleteNode.java
@@ -192,7 +192,11 @@ public class IcebergDeleteNode extends JoinNode {
     // which have delete files and roaring bitmaps that store the positions. 
Roaring
     // bitmaps store bitmaps in a compressed format, so 2 bytes per position 
is a
     // conservative estimate (actual mem usage is expected to be lower).
-    int numberOfDataFilesWithDelete = ((IcebergScanNode) getChild(0))
+    PlanNode lhsNode = getChild(0);
+    // Tuple caching can place a TupleCacheNode above the scan, so look past a
+    // TupleCacheNode if it is present
+    if (lhsNode instanceof TupleCacheNode) lhsNode = lhsNode.getChild(0);
+    int numberOfDataFilesWithDelete = ((IcebergScanNode) lhsNode)
         .getFileDescriptorsWithLimit(null, false, -1).size();
     double avgFilePathLen = getChild(1).getAvgRowSize();
     long filePathsSize = (long) Math.ceil(numberOfDataFilesWithDelete * 
avgFilePathLen);
diff --git a/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java 
b/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
index f447d55fd..594ac5c88 100644
--- a/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
@@ -319,6 +319,15 @@ public class TupleCacheTest extends PlannerTestBase {
         "and probe.bool_col = true and probe.int_col = 100");
   }
 
+  @Test
+  public void testIceberg() {
+    // Sanity test that we can plan a basic scenario querying from an Iceberg 
table
+    // that has different types of deletes
+    verifyIdenticalCacheKeys(
+        "select * from functional_parquet.iceberg_v2_delete_both_eq_and_pos",
+        "select * from functional_parquet.iceberg_v2_delete_both_eq_and_pos");
+  }
+
   @Test
   public void testDeterministicScheduling() {
     // Verify that the HdfsScanNode that feeds into a TupleCacheNode uses 
deterministic
diff --git a/tests/custom_cluster/test_tuple_cache.py 
b/tests/custom_cluster/test_tuple_cache.py
index b392e30a1..82db750cf 100644
--- a/tests/custom_cluster/test_tuple_cache.py
+++ b/tests/custom_cluster/test_tuple_cache.py
@@ -24,6 +24,7 @@ import re
 import string
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.skip import SkipIfDockerizedCluster, SkipIf
 from tests.common.test_dimensions import (
     add_exec_option_dimension, add_mandatory_exec_option)
 from tests.util.parse_util import (
@@ -762,6 +763,25 @@ class TestTupleCacheFullCluster(TestTupleCacheBase):
     different_rows = 
before_result_set.symmetric_difference(after_insert_result_set)
     assert len(different_rows) == 1
 
+  @SkipIfDockerizedCluster.internal_hostname
+  @SkipIf.hardcoded_uris
+  def test_iceberg_deletes(self, vector):  # noqa: U100
+    """
+    Test basic Iceberg v2 deletes, which relies on the directed mode and 
looking
+    past TupleCacheNodes to find the scan nodes.
+    """
+
+    # This query tests both equality deletes and positional deletes.
+    query = "select * from 
functional_parquet.iceberg_v2_delete_both_eq_and_pos " + \
+        "order by i"
+    result1 = self.execute_query(query)
+    result2 = self.execute_query(query)
+    assert result1.success and result2.success
+
+    assert result1.data == result2.data
+    assert result1.data[0].split("\t") == ["2", "str2_updated", "2023-12-13"]
+    assert result1.data[1].split("\t") == ["3", "str3", "2023-12-23"]
+
 
 @CustomClusterTestSuite.with_args(start_args=CACHE_START_ARGS, cluster_size=1)
 class TestTupleCacheMtdop(TestTupleCacheBase):

Reply via email to