This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 7aa4d50484e5508ac2253f4b7e08bdccbaa43d54 Author: Joe McDonnell <[email protected]> AuthorDate: Fri Mar 21 12:33:20 2025 -0700 IMPALA-13882: Fix Iceberg v2 deletes with tuple caching A variety of Iceberg statements (including v2 deletes) rely on getting information from the scan node child of the delete node. Since tuple caching can insert a TupleCacheNode above that scan, the logic is currently failing, because it doesn't know how to bypass the TupleCacheNode and get to the scan node below. This modifies the logic in multiple places to detect a TupleCacheNode and go past it to the get the scan node below it. Testing: - Added a basic Iceberg test with v2 deletes for the frontend test and custom cluster test Change-Id: I162e738c4e4449a536701a740272aaac56ce8fd8 Reviewed-on: http://gerrit.cloudera.org:8080/22666 Reviewed-by: Kurt Deschler <[email protected]> Reviewed-by: Michael Smith <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/exec/exec-node.h | 11 +++++++++++ be/src/exec/iceberg-delete-builder.cc | 8 +++++++- be/src/runtime/fragment-state.cc | 12 ++++++++---- .../org/apache/impala/planner/IcebergDeleteNode.java | 6 +++++- .../org/apache/impala/planner/TupleCacheTest.java | 9 +++++++++ tests/custom_cluster/test_tuple_cache.py | 20 ++++++++++++++++++++ 6 files changed, 60 insertions(+), 6 deletions(-) diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h index 40d77a327..5f1f2c577 100644 --- a/be/src/exec/exec-node.h +++ b/be/src/exec/exec-node.h @@ -119,6 +119,17 @@ class PlanNode { std::vector<PlanNode*> children_; + /// Helper function to skip past a TupleCacheNode if present. If the provided PlanNode + /// is a TupleCacheNode, it returns the child. Otherwise, it returns the provided + /// PlanNode. + static PlanNode* LookPastTupleCache(PlanNode* pnode) { + if (pnode->tnode_->node_type == TPlanNodeType::TUPLE_CACHE_NODE) { + pnode = pnode->children_[0]; + } + DCHECK(pnode != nullptr); + return pnode; + } + /// Pointer to the containing SubplanPlanNode or NULL if not inside a subplan. /// Set by the containing SubplanPlanNode::Init() before Init() is called on /// 'this' node. Not owned. diff --git a/be/src/exec/iceberg-delete-builder.cc b/be/src/exec/iceberg-delete-builder.cc index 510d350fc..d23600c6d 100644 --- a/be/src/exec/iceberg-delete-builder.cc +++ b/be/src/exec/iceberg-delete-builder.cc @@ -127,7 +127,13 @@ Status IcebergDeleteBuilder::CalculateDataFiles() { q.pop(); if (current->tnode_->node_id == join_node_id_) { fragment_it = it; - delete_scan_node = current->children_[0]; + // Tuple caching can place a TupleCacheNode above the scan node. Look past + // a TupleCacheNode to get to the scan node. + delete_scan_node = PlanNode::LookPastTupleCache(current->children_[0]); + DCHECK_EQ(delete_scan_node->tnode_->node_type, TPlanNodeType::HDFS_SCAN_NODE) + << "Failed to calculate delete files: " + << Substitute("Unexpected type for plan node $0: $1", + delete_scan_node->tnode_->node_id, delete_scan_node->tnode_->node_type); found = true; while (!q.empty()) q.pop(); break; diff --git a/be/src/runtime/fragment-state.cc b/be/src/runtime/fragment-state.cc index e16e72d9e..f2bb58600 100644 --- a/be/src/runtime/fragment-state.cc +++ b/be/src/runtime/fragment-state.cc @@ -102,18 +102,22 @@ Status FragmentState::PutFilesToHostsMappingToSinkConfig() { DCHECK_EQ(fragment_.output_sink.stream_sink.output_partition.type, TPartitionType::DIRECTED); + // Tuple caching can place a TupleCacheNode above the scan, so look past a + // TupleCacheNode to reach the original scan + PlanNode* pnode = PlanNode::LookPastTupleCache(plan_tree_); + const QueryState::NodeToFileSchedulings* node_to_file_schedulings = query_state_->node_to_file_schedulings(); QueryState::NodeToFileSchedulings::const_iterator node_to_file_schedulings_it = - node_to_file_schedulings->find(plan_tree_->tnode_->node_id); + node_to_file_schedulings->find(pnode->tnode_->node_id); if (node_to_file_schedulings_it == node_to_file_schedulings->end()) { return Status(Substitute("Failed to find file to hosts mapping for plan node: " - "$0", plan_tree_->tnode_->node_id)); + "$0", pnode->tnode_->node_id)); } - DCHECK_EQ(plan_tree_->tnode_->node_type, TPlanNodeType::HDFS_SCAN_NODE); + DCHECK_EQ(pnode->tnode_->node_type, TPlanNodeType::HDFS_SCAN_NODE); TupleDescriptor* tuple_desc = - desc_tbl().GetTupleDescriptor(plan_tree_->tnode_->hdfs_scan_node.tuple_id); + desc_tbl().GetTupleDescriptor(pnode->tnode_->hdfs_scan_node.tuple_id); DCHECK(tuple_desc != nullptr); DCHECK(tuple_desc->table_desc() != nullptr); const HdfsTableDescriptor* hdfs_table_desc = diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergDeleteNode.java b/fe/src/main/java/org/apache/impala/planner/IcebergDeleteNode.java index 3447b218c..591a1c9be 100644 --- a/fe/src/main/java/org/apache/impala/planner/IcebergDeleteNode.java +++ b/fe/src/main/java/org/apache/impala/planner/IcebergDeleteNode.java @@ -192,7 +192,11 @@ public class IcebergDeleteNode extends JoinNode { // which have delete files and roaring bitmaps that store the positions. Roaring // bitmaps store bitmaps in a compressed format, so 2 bytes per position is a // conservative estimate (actual mem usage is expected to be lower). - int numberOfDataFilesWithDelete = ((IcebergScanNode) getChild(0)) + PlanNode lhsNode = getChild(0); + // Tuple caching can place a TupleCacheNode above the scan, so look past a + // TupleCacheNode if it is present + if (lhsNode instanceof TupleCacheNode) lhsNode = lhsNode.getChild(0); + int numberOfDataFilesWithDelete = ((IcebergScanNode) lhsNode) .getFileDescriptorsWithLimit(null, false, -1).size(); double avgFilePathLen = getChild(1).getAvgRowSize(); long filePathsSize = (long) Math.ceil(numberOfDataFilesWithDelete * avgFilePathLen); diff --git a/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java b/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java index f447d55fd..594ac5c88 100644 --- a/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java +++ b/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java @@ -319,6 +319,15 @@ public class TupleCacheTest extends PlannerTestBase { "and probe.bool_col = true and probe.int_col = 100"); } + @Test + public void testIceberg() { + // Sanity test that we can plan a basic scenario querying from an Iceberg table + // that has different types of deletes + verifyIdenticalCacheKeys( + "select * from functional_parquet.iceberg_v2_delete_both_eq_and_pos", + "select * from functional_parquet.iceberg_v2_delete_both_eq_and_pos"); + } + @Test public void testDeterministicScheduling() { // Verify that the HdfsScanNode that feeds into a TupleCacheNode uses deterministic diff --git a/tests/custom_cluster/test_tuple_cache.py b/tests/custom_cluster/test_tuple_cache.py index b392e30a1..82db750cf 100644 --- a/tests/custom_cluster/test_tuple_cache.py +++ b/tests/custom_cluster/test_tuple_cache.py @@ -24,6 +24,7 @@ import re import string from tests.common.custom_cluster_test_suite import CustomClusterTestSuite +from tests.common.skip import SkipIfDockerizedCluster, SkipIf from tests.common.test_dimensions import ( add_exec_option_dimension, add_mandatory_exec_option) from tests.util.parse_util import ( @@ -762,6 +763,25 @@ class TestTupleCacheFullCluster(TestTupleCacheBase): different_rows = before_result_set.symmetric_difference(after_insert_result_set) assert len(different_rows) == 1 + @SkipIfDockerizedCluster.internal_hostname + @SkipIf.hardcoded_uris + def test_iceberg_deletes(self, vector): # noqa: U100 + """ + Test basic Iceberg v2 deletes, which relies on the directed mode and looking + past TupleCacheNodes to find the scan nodes. + """ + + # This query tests both equality deletes and positional deletes. + query = "select * from functional_parquet.iceberg_v2_delete_both_eq_and_pos " + \ + "order by i" + result1 = self.execute_query(query) + result2 = self.execute_query(query) + assert result1.success and result2.success + + assert result1.data == result2.data + assert result1.data[0].split("\t") == ["2", "str2_updated", "2023-12-13"] + assert result1.data[1].split("\t") == ["3", "str3", "2023-12-23"] + @CustomClusterTestSuite.with_args(start_args=CACHE_START_ARGS, cluster_size=1) class TestTupleCacheMtdop(TestTupleCacheBase):
