This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 18c31fd383d81082b06071b03b49650ec89e7086 Author: Gabor Kaszab <[email protected]> AuthorDate: Mon Jul 31 15:10:49 2023 +0200 IMPALA-12308: DIRECTED distribution mode for V2 Iceberg tables For Iceberg tables, when joining the data files with the delete files, both of the current distribution modes (broadcast, partitioned) are wasteful. The idea is that when we read a row from a delete file it contains the name of the data file that this particular delete row is referring to so if we knew where that data file is scheduled we could directly send that delete file row there. This patch enhances the scheduler to collect the information about which data file is scheduled on which host. Since, the scan node for the data files are on the same host as the Iceberg join node, we can send the delete files directly to that specific host. Functional testing: - Re-run full test suite to check for regressions. Performance testing: 1) Local machine: SELECT COUNT(1) FROM TPCH10_parquet.lineitem Around 15% of the rows are deleted. As the table is unpartitioned I got a small number of delete files with relatively large size. Query runtime decreased by ~80% 2) Local machine: SELECT COUNT(1) FROM TPCDS10_store_sales Around 15% of the rows are deleted. Table is partitioned that results more delete files but smaller in size. Query runtime decreased by ~50% 3) Performance testing in a multi-node with data stored on S3. SELECT COUNT(1) FROM a scaled store_sales table having ~8.6B rows and ~15% are deleted. Here we had 2 scenarios: a) Table is written by Impala: One delete file row is sent exactly to one host. b) Table is written by Hive: Here apparently the data files are bigger and one data file might be spread to multiple scan ranges. As a result one delete file row might be sent to multiple hosts. The time difference between the a) run is the time spent on sending out more delete file rows. - Results with 10-node a) Runtime decreased by ~80%. b) Runtime decreased by ~60%. - Results with 20-node a) Runtime decreased by ~65%. b) Runtime decreased by ~42%. - Results with 40-node a) Runtime decreased by ~55%. b) Runtime decreased by ~42%. Change-Id: I212afd7c9e94551a1c50a40ccb0e3c1f7ecdf3d2 Reviewed-on: http://gerrit.cloudera.org:8080/20548 Reviewed-by: Tamas Mate <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/exec/data-sink.h | 5 + be/src/runtime/coordinator-backend-state.cc | 21 +++ be/src/runtime/coordinator-backend-state.h | 3 + be/src/runtime/fragment-state.cc | 52 +++++++- be/src/runtime/fragment-state.h | 6 +- be/src/runtime/krpc-data-stream-sender.cc | 89 ++++++++++++- be/src/runtime/krpc-data-stream-sender.h | 23 +++- be/src/runtime/query-state.cc | 11 ++ be/src/runtime/query-state.h | 21 +++ be/src/scheduling/scheduler.cc | 43 ++++++- be/src/scheduling/scheduler.h | 13 ++ be/src/service/query-options.cc | 5 +- common/protobuf/admission_control_service.proto | 3 + common/protobuf/control_service.proto | 17 +++ common/thrift/Partitions.thrift | 5 + common/thrift/PlanNodes.thrift | 4 + common/thrift/Query.thrift | 1 + .../org/apache/impala/planner/DataPartition.java | 5 +- .../apache/impala/planner/DistributedPlanner.java | 136 +++----------------- .../org/apache/impala/planner/ExchangeNode.java | 29 ++++- .../apache/impala/planner/IcebergDeleteNode.java | 13 +- .../org/apache/impala/planner/IcebergScanNode.java | 25 ++++ .../apache/impala/planner/IcebergScanPlanner.java | 2 +- .../java/org/apache/impala/planner/JoinNode.java | 26 ++-- .../queries/PlannerTest/iceberg-v2-delete.test | 70 +++++----- .../queries/PlannerTest/iceberg-v2-tables.test | 142 +++++++++------------ .../QueryTest/iceberg-v2-directed-mode.test | 28 ++++ .../iceberg-v2-read-position-deletes.test | 4 + .../functional-query/queries/QueryTest/set.test | 5 + tests/query_test/test_iceberg.py | 22 ++++ 30 files changed, 554 insertions(+), 275 deletions(-) diff --git a/be/src/exec/data-sink.h b/be/src/exec/data-sink.h index 357f4c8d0..a16637652 100644 --- a/be/src/exec/data-sink.h +++ b/be/src/exec/data-sink.h @@ -20,6 +20,7 @@ #define IMPALA_EXEC_DATA_SINK_H #include <boost/scoped_ptr.hpp> +#include <unordered_map> #include <vector> #include "common/status.h" @@ -42,6 +43,7 @@ class TPlanExecRequest; class TPlanExecParams; class TPlanFragmentInstanceCtx; class TInsertStats; +class NetworkAddressPB; /// Configuration class for creating DataSink objects. It contains a subset of the static /// state of their corresponding DataSink, of which there is one instance per fragment. @@ -78,6 +80,9 @@ class DataSinkConfig { /// profile to convey codegen related information. Populated in Codegen(). std::vector<std::string> codegen_status_msgs_; + /// A mapping from file paths to hosts where the particular file is scheduled. + std::unordered_map<std::string, std::vector<NetworkAddressPB>> filepath_to_hosts_; + /// Creates a new data sink config, allocated in state->obj_pool() and returned through /// *sink, from the thrift sink object in fragment_ctx. static Status CreateConfig(const TDataSink& thrift_sink, const RowDescriptor* row_desc, diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc index ece2899e9..e689c59b9 100644 --- a/be/src/runtime/coordinator-backend-state.cc +++ b/be/src/runtime/coordinator-backend-state.cc @@ -327,6 +327,8 @@ void Coordinator::BackendState::ExecAsync(const DebugOptions& debug_options, } request.set_query_ctx_sidecar_idx(query_ctx_sidecar_idx); + CopyFilepathToHostsMappingToRequest(&request); + VLOG_FILE << "making rpc: ExecQueryFInstances" << " host=" << impalad_address() << " query_id=" << PrintId(query_id_); @@ -342,6 +344,25 @@ done: exec_done_cv_.NotifyAll(); } +void Coordinator::BackendState::CopyFilepathToHostsMappingToRequest( + ExecQueryFInstancesRequestPB* request) const { + DCHECK(request != nullptr); + google::protobuf::Map<int32, FilepathToHostsMapPB>* by_node_filepath_to_hosts = + request->mutable_by_node_filepath_to_hosts(); + for (const auto& it_nodes : exec_params_.query_schedule().by_node_filepath_to_hosts()) { + google::protobuf::Map<string, FilepathToHostsListPB>* filepath_to_hosts = + (*by_node_filepath_to_hosts)[it_nodes.first].mutable_filepath_to_hosts(); + for (const auto& it_files : it_nodes.second.filepath_to_hosts()) { + (*filepath_to_hosts)[it_files.first].set_is_relative( + it_files.second.is_relative()); + for (const auto& it_host : it_files.second.hosts()) { + auto* hosts = (*filepath_to_hosts)[it_files.first].add_hosts(); + *hosts = it_host; + } + } + } +} + Status Coordinator::BackendState::GetStatus(bool* is_fragment_failure, TUniqueId* failed_instance_id) { lock_guard<mutex> l(lock_); diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h index 80ef0e571..a8755954b 100644 --- a/be/src/runtime/coordinator-backend-state.h +++ b/be/src/runtime/coordinator-backend-state.h @@ -474,6 +474,9 @@ class Coordinator::BackendState { /// Logs 'msg' at the VLOG_QUERY level, along with 'query_id_' and 'krpc_host_'. void VLogForBackend(const std::string& msg); + + /// Populates the 'by_node_filepath_to_hosts' mapping in 'request' from 'exec_params_'. + void CopyFilepathToHostsMappingToRequest(ExecQueryFInstancesRequestPB* request) const; }; /// Per fragment execution statistics. diff --git a/be/src/runtime/fragment-state.cc b/be/src/runtime/fragment-state.cc index 374a78c20..e16e72d9e 100644 --- a/be/src/runtime/fragment-state.cc +++ b/be/src/runtime/fragment-state.cc @@ -23,6 +23,7 @@ #include "exec/exec-node.h" #include "exec/data-sink.h" #include "exprs/slot-ref.h" +#include "kudu/util/path_util.h" #include "runtime/exec-env.h" #include "runtime/query-state.h" #include "gen-cpp/ImpalaInternalService_types.h" @@ -39,13 +40,13 @@ const string FragmentState::FSTATE_THREAD_GROUP_NAME = "fragment-init"; const string FragmentState::FSTATE_THREAD_NAME_PREFIX = "init-and-codegen"; Status FragmentState::CreateFragmentStateMap(const TExecPlanFragmentInfo& fragment_info, - const ExecQueryFInstancesRequestPB& exec_request, QueryState* state, + const ExecQueryFInstancesRequestPB& exec_request, QueryState* query_state, std::unordered_map<TFragmentIdx, FragmentState*>& fragment_map) { int fragment_ctx_idx = 0; const TPlanFragment& frag = fragment_info.fragments[fragment_ctx_idx]; const PlanFragmentCtxPB& frag_ctx = exec_request.fragment_ctxs(fragment_ctx_idx); FragmentState* fragment_state = - state->obj_pool()->Add(new FragmentState(state, frag, frag_ctx)); + query_state->obj_pool()->Add(new FragmentState(query_state, frag, frag_ctx)); fragment_map[fragment_state->fragment_idx()] = fragment_state; for (int i = 0; i < fragment_info.fragment_instance_ctxs.size(); ++i) { const TPlanFragmentInstanceCtx& instance_ctx = @@ -61,8 +62,8 @@ Status FragmentState::CreateFragmentStateMap(const TExecPlanFragmentInfo& fragme const PlanFragmentCtxPB& fragment_ctx = exec_request.fragment_ctxs(fragment_ctx_idx); DCHECK_EQ(fragment.idx, fragment_ctx.fragment_idx()); - fragment_state = - state->obj_pool()->Add(new FragmentState(state, fragment, fragment_ctx)); + fragment_state = query_state->obj_pool()->Add( + new FragmentState(query_state, fragment, fragment_ctx)); fragment_map[fragment_state->fragment_idx()] = fragment_state; // we expect fragment and instance contexts to follow the same order DCHECK_EQ(fragment_state->fragment_idx(), instance_ctx.fragment_idx); @@ -85,6 +86,49 @@ Status FragmentState::Init() { RETURN_IF_ERROR(PlanNode::CreateTree(this, fragment_.plan, &plan_tree_)); RETURN_IF_ERROR(DataSinkConfig::CreateConfig( fragment_.output_sink, plan_tree_->row_descriptor_, this, &sink_config_)); + + if (fragment_.output_sink.type == TDataSinkType::DATA_STREAM_SINK) { + DCHECK(fragment_.output_sink.__isset.stream_sink); + if (fragment_.output_sink.stream_sink.output_partition.type == + TPartitionType::DIRECTED) { + RETURN_IF_ERROR(PutFilesToHostsMappingToSinkConfig()); + } + } + return Status::OK(); +} + +Status FragmentState::PutFilesToHostsMappingToSinkConfig() { + DCHECK(plan_tree_->tnode_ != nullptr); + DCHECK_EQ(fragment_.output_sink.stream_sink.output_partition.type, + TPartitionType::DIRECTED); + + const QueryState::NodeToFileSchedulings* node_to_file_schedulings = + query_state_->node_to_file_schedulings(); + QueryState::NodeToFileSchedulings::const_iterator node_to_file_schedulings_it = + node_to_file_schedulings->find(plan_tree_->tnode_->node_id); + if (node_to_file_schedulings_it == node_to_file_schedulings->end()) { + return Status(Substitute("Failed to find file to hosts mapping for plan node: " + "$0", plan_tree_->tnode_->node_id)); + } + + DCHECK_EQ(plan_tree_->tnode_->node_type, TPlanNodeType::HDFS_SCAN_NODE); + TupleDescriptor* tuple_desc = + desc_tbl().GetTupleDescriptor(plan_tree_->tnode_->hdfs_scan_node.tuple_id); + DCHECK(tuple_desc != nullptr); + DCHECK(tuple_desc->table_desc() != nullptr); + const HdfsTableDescriptor* hdfs_table_desc = + static_cast<const HdfsTableDescriptor*>(tuple_desc->table_desc()); + + for (auto file_scheduling : node_to_file_schedulings_it->second) { + string full_file_path = file_scheduling.file_path; + if (file_scheduling.is_relative_path) { + full_file_path = kudu::JoinPathSegments( + hdfs_table_desc->hdfs_base_dir(), file_scheduling.file_path); + } + DCHECK(!full_file_path.empty()); + + sink_config_->filepath_to_hosts_[full_file_path] = file_scheduling.hosts; + } return Status::OK(); } diff --git a/be/src/runtime/fragment-state.h b/be/src/runtime/fragment-state.h index 7dac84e44..78d16220f 100644 --- a/be/src/runtime/fragment-state.h +++ b/be/src/runtime/fragment-state.h @@ -41,7 +41,7 @@ class FragmentState { /// thrift and protobuf references of the fragment and instance context objects from /// 'fragment_info' and 'exec_request'. static Status CreateFragmentStateMap(const TExecPlanFragmentInfo& fragment_info, - const ExecQueryFInstancesRequestPB& exec_request, QueryState* state, + const ExecQueryFInstancesRequestPB& exec_request, QueryState* query_state, std::unordered_map<TFragmentIdx, FragmentState*>& fragment_map); FragmentState(QueryState* query_state, const TPlanFragment& fragment, const PlanFragmentCtxPB& fragment_ctx); @@ -236,5 +236,9 @@ class FragmentState { /// Create the plan tree, data sink config. Status Init(); + + /// Helper function to populate the filename to hosts mapping in 'sink_config_' from + /// 'query_state_'; + Status PutFilesToHostsMappingToSinkConfig(); }; } diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc index 499dba0e9..da65f549d 100644 --- a/be/src/runtime/krpc-data-stream-sender.cc +++ b/be/src/runtime/krpc-data-stream-sender.cc @@ -738,12 +738,14 @@ KrpcDataStreamSender::KrpcDataStreamSender(TDataSinkId sink_id, int sender_id, dest_node_id_(sink.dest_node_id), next_unknown_partition_(0), exchange_hash_seed_(sink_config.exchange_hash_seed_), - hash_and_add_rows_fn_(sink_config.hash_and_add_rows_fn_) { + hash_and_add_rows_fn_(sink_config.hash_and_add_rows_fn_), + filepath_to_hosts_(sink_config.filepath_to_hosts_) { DCHECK_GT(destinations.size(), 0); DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED || sink.output_partition.type == TPartitionType::HASH_PARTITIONED || sink.output_partition.type == TPartitionType::RANDOM - || sink.output_partition.type == TPartitionType::KUDU); + || sink.output_partition.type == TPartitionType::KUDU + || sink.output_partition.type == TPartitionType::DIRECTED); string process_address = NetworkAddressPBToString(ExecEnv::GetInstance()->krpc_address()); @@ -753,6 +755,11 @@ KrpcDataStreamSender::KrpcDataStreamSender(TDataSinkId sink_id, int sender_id, channels_.emplace_back(new Channel(this, row_desc_, destination.address().hostname(), destination.krpc_backend(), destination.fragment_instance_id(), sink.dest_node_id, per_channel_buffer_size, is_local)); + + if (IsDirectedMode()) { + DCHECK(host_to_channel_.find(destination.address()) == host_to_channel_.end()); + host_to_channel_[destination.address()] = channels_.back().get(); + } } if (partition_type_ == TPartitionType::UNPARTITIONED @@ -761,6 +768,8 @@ KrpcDataStreamSender::KrpcDataStreamSender(TDataSinkId sink_id, int sender_id, random_shuffle(channels_.begin(), channels_.end()); } + DCHECK(filepath_to_hosts_.empty() || partition_type_ == TPartitionType::DIRECTED) << + " TPartitionType: " << partition_type_ << " dest ID: " << dest_node_id_; } KrpcDataStreamSender::~KrpcDataStreamSender() { @@ -952,6 +961,8 @@ string KrpcDataStreamSenderConfig::PartitionTypeName() const { return "Random Partitioned"; case TPartitionType::KUDU: return "Kudu Partitioned"; + case TPartitionType::DIRECTED: + return "Directed distribution mode"; default: DCHECK(false) << partition_type_; return ""; @@ -1064,6 +1075,60 @@ Status KrpcDataStreamSender::Send(RuntimeState* state, RowBatch* batch) { RETURN_IF_ERROR(channels_[channel_ids[i]]->AddRow(row)); } } + } else if (partition_type_ == TPartitionType::DIRECTED) { + const int num_rows = batch->num_rows(); + char* prev_filename_ptr = nullptr; + vector<Channel*> prev_channels; + bool skipped_prev_row = false; + for (int row_idx = 0; row_idx < num_rows; ++row_idx) { + DCHECK_EQ(batch->num_tuples_per_row(), 1); + TupleRow* tuple_row = batch->GetRow(row_idx); + Tuple* row = batch->GetRow(row_idx)->GetTuple(0); + StringValue* filename_value = row->GetStringSlot(0); + DCHECK(filename_value != nullptr); + + if (filename_value->ptr == prev_filename_ptr) { + // If the filename pointer is the same as the previous one then we can instantly + // send the row to the same channels as the previous row. + DCHECK(skipped_prev_row || !prev_channels.empty()); + for (Channel* ch : prev_channels) RETURN_IF_ERROR(ch->AddRow(tuple_row)); + continue; + } + prev_filename_ptr = filename_value->ptr; + string filename(filename_value->ptr, filename_value->len); + + const auto filepath_to_hosts_it = filepath_to_hosts_.find(filename); + if (filepath_to_hosts_it == filepath_to_hosts_.end()) { + // This can happen when e.g. compaction removed some data files from a snapshot + // but a delete file referencing them remained because it references other data + // files that remains in the new snapshot. + // Another use-case is table sampling where we read only a subset of the data + // files. + VLOG(3) << "Row from delete file refers to a non-existing data file: " << + filename; + skipped_prev_row = true; + continue; + } + skipped_prev_row = false; + + prev_channels.clear(); + for (const NetworkAddressPB& host_addr : filepath_to_hosts_it->second) { + const auto channel_map_it = host_to_channel_.find(host_addr); + if (channel_map_it == host_to_channel_.end()) { + DumpFilenameToHostsMapping(); + DumpDestinationHosts(); + + stringstream ss; + ss << "Failed to distribute Iceberg delete file content" + " in DIRECTED distribution mode. Host not found " << host_addr << + ". Try 'SET DISABLE_OPTIMIZED_ICEBERG_V2_READ=1' as a workaround."; + return Status(ss.str()); + } + + prev_channels.push_back(channel_map_it->second); + RETURN_IF_ERROR(channel_map_it->second->AddRow(tuple_row)); + } + } } else { DCHECK_EQ(partition_type_, TPartitionType::HASH_PARTITIONED); const KrpcDataStreamSenderConfig::HashAndAddRowsFn hash_and_add_rows_fn = @@ -1080,6 +1145,26 @@ Status KrpcDataStreamSender::Send(RuntimeState* state, RowBatch* batch) { return Status::OK(); } +void KrpcDataStreamSender::DumpFilenameToHostsMapping() const { + VLOG(3) << "Dumping the contents of the filename to hosts mapping"; + if (filepath_to_hosts_.empty()) { + VLOG(3) << "The mapping is empty"; + return; + } + for (const auto& file_to_hosts : filepath_to_hosts_) { + for (const auto& host_addr : file_to_hosts.second) { + VLOG(3) << "Filename: " << file_to_hosts.first << " host address: " << host_addr; + } + } +} + +void KrpcDataStreamSender::DumpDestinationHosts() const { + VLOG(3) << "Dumping the destination hosts"; + for (const auto& host : host_to_channel_) { + VLOG(3) << "Network Address: " << host.first; + } +} + Status KrpcDataStreamSender::FlushFinal(RuntimeState* state) { SCOPED_TIMER(profile()->total_time_counter()); DCHECK(!flushed_); diff --git a/be/src/runtime/krpc-data-stream-sender.h b/be/src/runtime/krpc-data-stream-sender.h index 4308827f8..1b1b78813 100644 --- a/be/src/runtime/krpc-data-stream-sender.h +++ b/be/src/runtime/krpc-data-stream-sender.h @@ -20,6 +20,7 @@ #define IMPALA_RUNTIME_KRPC_DATA_STREAM_SENDER_H #include <string> +#include <unordered_map> #include <vector> #include "codegen/impala-ir.h" @@ -30,17 +31,21 @@ #include "exprs/scalar-expr.h" #include "runtime/mem-tracker.h" #include "runtime/outbound-row-batch.h" +#include "util/container-util.h" #include "util/runtime-profile.h" +#include "gen-cpp/common.pb.h" + namespace impala { class KrpcDataStreamSender; class MemTracker; +class NetworkAddressPB; +class PlanFragmentDestinationPB; class RowBatch; class RowDescriptor; class TDataStreamSink; class TNetworkAddress; -class PlanFragmentDestinationPB; class KrpcDataStreamSenderConfig : public DataSinkConfig { public: @@ -105,7 +110,7 @@ class KrpcDataStreamSender : public DataSink { /// 'per_channel_buffer_size' is the soft limit in bytes of the buffering into the /// per-channel's accumulating row batch before it will be sent. /// NOTE: supported partition types are UNPARTITIONED (broadcast), HASH_PARTITIONED, - /// and RANDOM. + /// RANDOM, and DIRECTED (used for sending rows from Iceberg delete files). KrpcDataStreamSender(TDataSinkId sink_id, int sender_id, const KrpcDataStreamSenderConfig& sink_config, const TDataStreamSink& sink, const google::protobuf::RepeatedPtrField<PlanFragmentDestinationPB>& destinations, @@ -185,6 +190,12 @@ class KrpcDataStreamSender : public DataSink { /// Adds the given row to 'channels_[channel_id]'. Status AddRowToChannel(const int channel_id, TupleRow* row); + /// Functions to dump the content of the "filename to hosts" related mappings into logs. + void DumpFilenameToHostsMapping() const; + void DumpDestinationHosts() const; + + bool IsDirectedMode() const { return !filepath_to_hosts_.empty(); } + /// Sender instance id, unique within a fragment. const int sender_id_; @@ -278,6 +289,14 @@ class KrpcDataStreamSender : public DataSink { /// Pointer for the codegen'd HashAndAddRows() function. /// NULL if codegen is disabled or failed. const CodegenFnPtr<KrpcDataStreamSenderConfig::HashAndAddRowsFn>& hash_and_add_rows_fn_; + + /// Mapping to store which data file is read on which hosts. + const std::unordered_map<std::string, std::vector<NetworkAddressPB>>& + filepath_to_hosts_; + + /// A mapping between host addresses to channels. Used for DIRECTED distribution mode + /// where only one channel is associated with each host address. + std::unordered_map<NetworkAddressPB, Channel*> host_to_channel_; }; } // namespace impala diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc index 6f7522939..31f833b8d 100644 --- a/be/src/runtime/query-state.cc +++ b/be/src/runtime/query-state.cc @@ -268,6 +268,17 @@ Status QueryState::Init(const ExecQueryFInstancesRequestPB* exec_rpc_params, RETURN_IF_ERROR( ControlService::GetProxy(coord_addr, query_ctx().coord_hostname, &proxy_)); + // Making a copy of the "filepath to hosts" mapping into std library types. + for (const auto& nodes : exec_rpc_params->by_node_filepath_to_hosts()) { + for (const auto& files : nodes.second.filepath_to_hosts()) { + FileSchedulingInfo file_schedule_info(files.first, files.second.is_relative()); + for (const auto& host : files.second.hosts()) { + file_schedule_info.hosts.push_back(host); + } + node_to_file_schedulings_[nodes.first].push_back(file_schedule_info); + } + } + // don't copy query_ctx, it's large and we already did that in the c'tor exec_rpc_params_.set_coord_state_idx(exec_rpc_params->coord_state_idx()); exec_rpc_params_.mutable_fragment_ctxs()->Swap( diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h index ba749b351..838a78836 100644 --- a/be/src/runtime/query-state.h +++ b/be/src/runtime/query-state.h @@ -21,6 +21,7 @@ #include <memory> #include <string> #include <unordered_map> +#include <vector> #include "common/atomic.h" #include "common/compiler-util.h" @@ -119,6 +120,20 @@ class TRuntimeProfileForest; /// - set up kudu clients in Init(), remove related locking class QueryState { public: + /// Stores information about which file is scheduled to which hosts. + struct FileSchedulingInfo { + std::string file_path; + std::vector<NetworkAddressPB> hosts; + bool is_relative_path; + + FileSchedulingInfo(const string& fn, bool is_rel) + : file_path(fn), is_relative_path(is_rel) {} + }; + + /// Stores information about which file is scheduled to which hosts, grouped by scan + /// node ID. + typedef std::unordered_map<int, std::vector<FileSchedulingInfo>> NodeToFileSchedulings; + /// Use this class to obtain a QueryState for the duration of a function/block, /// rather than manually via QueryExecMgr::Get-/ReleaseQueryState(). /// Pattern: @@ -154,6 +169,9 @@ class QueryState { bool codegen_cache_enabled() const; MemTracker* query_mem_tracker() const { return query_mem_tracker_; } RuntimeProfile* host_profile() const { return host_profile_; } + const NodeToFileSchedulings* node_to_file_schedulings() const { + return &node_to_file_schedulings_; + } UniqueIdPB GetCoordinatorBackendId() const; /// The following getters are only valid after Init(). @@ -379,6 +397,9 @@ class QueryState { ExecQueryFInstancesRequestPB exec_rpc_params_; TExecPlanFragmentInfo fragment_info_; + /// Stores which data file is scheduled to which host, grouped by scan node ID. + NodeToFileSchedulings node_to_file_schedulings_; + /// Buffer reservation for this query (owned by obj_pool_). Set in Init(). ReservationTracker* buffer_reservation_ = nullptr; diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc index d42ec883b..d4ee971a3 100644 --- a/be/src/scheduling/scheduler.cc +++ b/be/src/scheduling/scheduler.cc @@ -252,7 +252,8 @@ Status Scheduler::ComputeFragmentExecParams( DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED || sink.output_partition.type == TPartitionType::HASH_PARTITIONED || sink.output_partition.type == TPartitionType::RANDOM - || sink.output_partition.type == TPartitionType::KUDU); + || sink.output_partition.type == TPartitionType::KUDU + || sink.output_partition.type == TPartitionType::DIRECTED); PlanNodeId exch_id = sink.dest_node_id; google::protobuf::Map<int32_t, int32_t>* per_exch_num_senders = dest_state->exec_params->mutable_per_exch_num_senders(); @@ -937,12 +938,52 @@ bool Scheduler::IsCoordinatorOnlyQuery(const TQueryExecRequest& exec_request) { && type == TPartitionType::UNPARTITIONED; } +void Scheduler::PopulateFilepathToHostsMapping(const FInstanceScheduleState& finst, + ScheduleState* state, ByNodeFilepathToHosts* duplicate_check) { + for (const auto& per_node_ranges : finst.exec_params.per_node_scan_ranges()) { + const TPlanNode& node = state->GetNode(per_node_ranges.first); + if (node.node_type != TPlanNodeType::HDFS_SCAN_NODE) continue; + if (!node.hdfs_scan_node.__isset.deleteFileScanNodeId) continue; + const TPlanNodeId delete_file_node_id = node.hdfs_scan_node.deleteFileScanNodeId; + + for (const auto& scan_ranges : per_node_ranges.second.scan_ranges()) { + string file_path; + bool is_relative = false; + const HdfsFileSplitPB& hdfs_file_split = scan_ranges.scan_range().hdfs_file_split(); + if (hdfs_file_split.has_relative_path() && + !hdfs_file_split.relative_path().empty()) { + file_path = hdfs_file_split.relative_path(); + is_relative = true; + } else { + file_path = hdfs_file_split.absolute_path(); + } + DCHECK(!file_path.empty()); + + std::unordered_set<NetworkAddressPB>& current_hosts = + (*duplicate_check)[delete_file_node_id][file_path]; + if (current_hosts.find(finst.host) != current_hosts.end()) continue; + current_hosts.insert(finst.host); + + auto* by_node_filepath_to_hosts = + state->query_schedule_pb()->mutable_by_node_filepath_to_hosts(); + auto* filepath_to_hosts = + (*by_node_filepath_to_hosts)[delete_file_node_id].mutable_filepath_to_hosts(); + (*filepath_to_hosts)[file_path].set_is_relative(is_relative); + auto* hosts = (*filepath_to_hosts)[file_path].add_hosts(); + *hosts = finst.host; + } + } +} + void Scheduler::ComputeBackendExecParams( const ExecutorConfig& executor_config, ScheduleState* state) { + ByNodeFilepathToHosts duplicate_check; for (FragmentScheduleState& f : state->fragment_schedule_states()) { const NetworkAddressPB* prev_host = nullptr; int num_hosts = 0; for (FInstanceScheduleState& i : f.instance_states) { + PopulateFilepathToHostsMapping(i, state, &duplicate_check); + BackendScheduleState& be_state = state->GetOrCreateBackendScheduleState(i.host); be_state.exec_params->add_instance_params()->Swap(&i.exec_params); // Different fragments do not synchronize their Open() and Close(), so the backend diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h index ee82b4997..b8a5aeae0 100644 --- a/be/src/scheduling/scheduler.h +++ b/be/src/scheduling/scheduler.h @@ -83,6 +83,11 @@ class Scheduler { typedef boost::unordered_map<IpAddr, ExecutorGroup::Executors::const_iterator> NextExecutorPerHost; + /// Map from file paths to hosts where those files are scheduled, grouped by scan node + /// ID. + typedef std::unordered_map<int, std::unordered_map< + std::string, std::unordered_set<NetworkAddressPB>>> ByNodeFilepathToHosts; + /// Internal structure to track scan range assignments for an executor host. This struct /// is used as the heap element in and maintained by AddressableAssignmentHeap. struct ExecutorAssignmentInfo { @@ -414,6 +419,14 @@ class Scheduler { void CreateCollocatedJoinBuildInstances( FragmentScheduleState* fragment_state, ScheduleState* state); + /// This is called during the execution of Schedule() and populates + /// 'ScheduleState::query_schedule_pb_::by_node_filepath_to_hosts' mapping with the + /// information of what files are scheduled to what hosts grouped by scan node. + /// 'duplicate_check' keeps track of the previously added items and used for preventing + /// duplicates being added. + void PopulateFilepathToHostsMapping(const FInstanceScheduleState& finst, + ScheduleState* state, ByNodeFilepathToHosts* duplicate_check); + /// Add all hosts that the scans identified by 'scan_ids' are executed on to /// 'scan_hosts'. void GetScanHosts(const std::vector<TPlanNodeId>& scan_ids, diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc index 7949e6ec6..062b9b6c2 100644 --- a/be/src/service/query-options.cc +++ b/be/src/service/query-options.cc @@ -524,8 +524,11 @@ Status impala::SetQueryOption(const string& key, const string& value, } case TImpalaQueryOptions::DEFAULT_JOIN_DISTRIBUTION_MODE: { TJoinDistributionMode::type enum_type; + // Not using the values from '_TJoinDistributionMode_VALUES_TO_NAMES' so that we + // can exclude 'DIRECTED' mode from the options. + std::map<int, const char*> values_to_names {{0, "BROADCAST"}, {1, "SHUFFLE"}}; RETURN_IF_ERROR(GetThriftEnum(value, "default join distribution mode", - _TJoinDistributionMode_VALUES_TO_NAMES, &enum_type)); + values_to_names, &enum_type)); query_options->__set_default_join_distribution_mode(enum_type); break; } diff --git a/common/protobuf/admission_control_service.proto b/common/protobuf/admission_control_service.proto index 811b0f4ad..a9dfe9981 100644 --- a/common/protobuf/admission_control_service.proto +++ b/common/protobuf/admission_control_service.proto @@ -154,6 +154,9 @@ message QuerySchedulePB { /// The cluster wide estimated memory usage of this query. optional int64 cluster_mem_est = 9; + + // Mapping to store which data file is read on which hosts, grouped by scan node ID. + map<int32, FilepathToHostsMapPB> by_node_filepath_to_hosts = 10; } message AdmitQueryRequestPB { diff --git a/common/protobuf/control_service.proto b/common/protobuf/control_service.proto index 4a4a3cbe4..40c329f8f 100644 --- a/common/protobuf/control_service.proto +++ b/common/protobuf/control_service.proto @@ -393,6 +393,20 @@ message PlanFragmentInstanceCtxPB { repeated JoinBuildInputPB join_build_inputs = 3; } +// List of host addresses. +message FilepathToHostsListPB { + repeated NetworkAddressPB hosts = 1; + + // True if the key in the map for this entry is a relative path, false if it is an + // absolute path. + required bool is_relative = 2; +} + +// File path to a list of host addresses mapping. +message FilepathToHostsMapPB { + map<string, FilepathToHostsListPB> filepath_to_hosts = 1; +} + // ExecQueryFInstances message ExecQueryFInstancesRequestPB { // This backend's index into Coordinator::backend_states_, needed for subsequent rpcs to @@ -430,6 +444,9 @@ message ExecQueryFInstancesRequestPB { // Execution parameters for specific fragment instances. Corresponds to // 'fragment_instance_ctxs' in the TExecPlanFragmentInfo sidecar. repeated PlanFragmentInstanceCtxPB fragment_instance_ctxs = 8; + + // Mapping to store which data file is read on which host, grouped by scan node ID. + map<int32, FilepathToHostsMapPB> by_node_filepath_to_hosts = 9; } message ExecQueryFInstancesResponsePB { diff --git a/common/thrift/Partitions.thrift b/common/thrift/Partitions.thrift index 64509dfa0..0d1c860ae 100644 --- a/common/thrift/Partitions.thrift +++ b/common/thrift/Partitions.thrift @@ -39,6 +39,11 @@ enum TPartitionType { // schemes. We should add something like lists of TDataPartitions to reflect that // and then this can be removed. (IMPALA-5255) KUDU = 4 + + // Used for distributing the contents of Iceberg delete files. Each row of a delete + // file is directly sent to the hosts that are responsible for reading the + // corresponding data files. No broadcast or shuffle is needed in this case. + DIRECTED = 5 } // Specification of how a single logical data stream is partitioned. diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift index 9914c66b5..ed95cb7b6 100644 --- a/common/thrift/PlanNodes.thrift +++ b/common/thrift/PlanNodes.thrift @@ -340,6 +340,10 @@ struct THdfsScanNode { // The overlap predicates 13: optional list<TOverlapPredicateDesc> overlap_predicate_descs + + // For IcebergScanNodes that are the left children of an IcebergDeleteNode this stores + // the node ID of the right child. + 14: optional Types.TPlanNodeId deleteFileScanNodeId } struct TDataSourceScanNode { diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift index 38d19019f..3808eb579 100644 --- a/common/thrift/Query.thrift +++ b/common/thrift/Query.thrift @@ -60,6 +60,7 @@ enum TKuduReplicaSelection { enum TJoinDistributionMode { BROADCAST = 0 SHUFFLE = 1 + DIRECTED = 2 } // The order of the enum values needs to be kept in sync with diff --git a/fe/src/main/java/org/apache/impala/planner/DataPartition.java b/fe/src/main/java/org/apache/impala/planner/DataPartition.java index a5d931676..53a60634e 100644 --- a/fe/src/main/java/org/apache/impala/planner/DataPartition.java +++ b/fe/src/main/java/org/apache/impala/planner/DataPartition.java @@ -56,7 +56,8 @@ public class DataPartition { private DataPartition(TPartitionType type) { Preconditions.checkState(type == TPartitionType.UNPARTITIONED - || type == TPartitionType.RANDOM); + || type == TPartitionType.RANDOM + || type == TPartitionType.DIRECTED); type_ = type; partitionExprs_ = new ArrayList<>(); } @@ -67,6 +68,8 @@ public class DataPartition { public final static DataPartition RANDOM = new DataPartition(TPartitionType.RANDOM); + public final static DataPartition DIRECTED = new DataPartition(TPartitionType.DIRECTED); + public static DataPartition hashPartitioned(List<Expr> exprs) { return new DataPartition(TPartitionType.HASH_PARTITIONED, exprs); } diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java index 2fa2b4445..d10e90db5 100644 --- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java @@ -160,7 +160,7 @@ public class DistributedPlanner { } else if (root instanceof IcebergDeleteNode) { Preconditions.checkState(childFragments.size() == 2); result = createIcebergDeleteFragment((IcebergDeleteNode) root, - childFragments.get(1), childFragments.get(0), fragments); + childFragments.get(0), childFragments.get(1)); } else { throw new InternalException("Cannot create plan fragment for this node type: " + root.getExplainString(ctx_.getQueryOptions())); @@ -674,135 +674,31 @@ public class DistributedPlanner { } /** - * Helper function to produce an iceberg delete fragment - */ - private PlanFragment createPartitionedIcebergDeleteFragment(IcebergDeleteNode node, - PlanFragment leftChildFragment, PlanFragment rightChildFragment, - List<Expr> lhsJoinExprs, List<Expr> rhsJoinExprs) { - Preconditions.checkState(node.getDistributionMode() == DistributionMode.PARTITIONED); - - DataPartition rhsJoinPartition = - DataPartition.hashPartitioned(Expr.cloneList(rhsJoinExprs)); - DataPartition lhsJoinPartition = - DataPartition.hashPartitioned(Expr.cloneList(lhsJoinExprs)); - - // Create a new parent fragment containing a HashJoin node with two - // ExchangeNodes as inputs; the latter are the destinations of the - // left- and rightChildFragments, which now partition their output - // on their respective join exprs. - // The new fragment is hash-partitioned on the lhs input join exprs. - ExchangeNode lhsExchange = - new ExchangeNode(ctx_.getNextNodeId(), leftChildFragment.getPlanRoot()); - lhsExchange.computeStats(ctx_.getRootAnalyzer()); - node.setChild(0, lhsExchange); - ExchangeNode rhsExchange = - new ExchangeNode(ctx_.getNextNodeId(), rightChildFragment.getPlanRoot()); - rhsExchange.computeStats(ctx_.getRootAnalyzer()); - node.setChild(1, rhsExchange); - - // Connect the child fragments in a new fragment, and set the data partition - // of the new fragment and its child fragments. - DataPartition outputPartition = lhsJoinPartition; - - PlanFragment joinFragment = - new PlanFragment(ctx_.getNextFragmentId(), node, outputPartition); - leftChildFragment.setDestination(lhsExchange); - leftChildFragment.setOutputPartition(lhsJoinPartition); - rightChildFragment.setDestination(rhsExchange); - rightChildFragment.setOutputPartition(rhsJoinPartition); - return joinFragment; - } - - /** - * Creates either a broadcast join or a repartitioning join depending on the expected - * cost and various constraints. See computeDistributionMode() for more details. + * Creates a fragment for an IcebergDeleteNode with DIRECTED distribution mode. + * Similarly to a BROADCAST join, the left child of the join is in the same fragment + * with the join itself, while the right child is in a separate fragment. */ private PlanFragment createIcebergDeleteFragment(IcebergDeleteNode node, - PlanFragment rightChildFragment, PlanFragment leftChildFragment, - List<PlanFragment> fragments) throws ImpalaException { - // For both join types, the total cost is calculated as the amount of data - // sent over the network, the hash tables build cost is roughly the same. - // broadcast: send the rightChildFragment's output to each node executing - // the leftChildFragment. - PlanNode rhsTree = rightChildFragment.getPlanRoot(); - long rhsDataSize = -1; - long broadcastCost = -1; - int mt_dop = ctx_.getQueryOptions().mt_dop; - int leftChildNodes = leftChildFragment.getNumNodes(); - if (rhsTree.getCardinality() != -1) { - rhsDataSize = Math.round( - rhsTree.getCardinality() * ExchangeNode.getAvgSerializedRowSize(rhsTree)); - - Preconditions.checkState(leftChildNodes != -1); - broadcastCost = rhsDataSize * leftChildNodes; - } - if (LOG.isTraceEnabled()) { - LOG.trace("broadcast: cost=" + Long.toString(broadcastCost)); - LOG.trace("card=" + Long.toString(rhsTree.getCardinality()) - + " row_size=" + Float.toString(rhsTree.getAvgRowSize()) - + " #nodes=" + Integer.toString(leftChildNodes)); - } - - // repartition: both left- and rightChildFragment are partitioned on the - // file path, and a hash table is built with the rightChildFragment's output. - PlanNode lhsTree = leftChildFragment.getPlanRoot(); - List<Expr> lhsJoinExprs = new ArrayList<>(); - List<Expr> rhsJoinExprs = new ArrayList<>(); - + PlanFragment leftChildFragment, PlanFragment rightChildFragment) + throws ImpalaException { Preconditions.checkState(node.getEqJoinConjuncts().size() == 2); BinaryPredicate filePathEq = node.getEqJoinConjuncts().get(1); - // Verify that the partitioning is based in file path + // Verify that the partitioning is based on file path Preconditions.checkState( ((SlotRef) filePathEq.getChild(0)).getDesc().getVirtualColumnType() == TVirtualColumnType.INPUT_FILE_NAME); - lhsJoinExprs.add(filePathEq.getChild(0).clone()); - rhsJoinExprs.add(filePathEq.getChild(1).clone()); - - long partitionCost = -1; - if (lhsTree.getCardinality() != -1 && rhsTree.getCardinality() != -1) { - Preconditions.checkState(rhsDataSize != -1); - double lhsNetworkCost = Math.round( - lhsTree.getCardinality() * ExchangeNode.getAvgSerializedRowSize(lhsTree)); - double rhsNetworkCost = rhsDataSize; - partitionCost = Math.round(lhsNetworkCost + rhsNetworkCost); - } - if (LOG.isTraceEnabled()) { - LOG.trace("partition: cost=" + Long.toString(partitionCost)); - LOG.trace("lhs card=" + Long.toString(lhsTree.getCardinality()) - + " row_size=" + Float.toString(lhsTree.getAvgRowSize())); - LOG.trace("rhs card=" + Long.toString(rhsTree.getCardinality()) - + " row_size=" + Float.toString(rhsTree.getAvgRowSize())); - LOG.trace(rhsTree.getExplainString(ctx_.getQueryOptions())); - } - - DistributionMode distrMode = DistributionMode.fromThrift( - ctx_.getQueryOptions().getDefault_join_distribution_mode()); - - // Broadcast mode has better fast path checks, it could be slightly more favoured, - // but network costs dominate probing costs, so it does not matter much. - if (broadcastCost != -1 && partitionCost != -1) { - if (broadcastCost < partitionCost) distrMode = DistributionMode.BROADCAST; - if (partitionCost < broadcastCost) distrMode = DistributionMode.PARTITIONED; - } - - node.setDistributionMode(distrMode); + node.setDistributionMode(DistributionMode.DIRECTED); - PlanFragment hjFragment = null; - if (distrMode == DistributionMode.BROADCAST) { - // Doesn't create a new fragment, but modifies leftChildFragment to execute - // the join; the build input is provided by an ExchangeNode, which is the - // destination of the rightChildFragment's output - node.setChild(0, leftChildFragment.getPlanRoot()); - connectChildFragment(node, 1, leftChildFragment, rightChildFragment); - leftChildFragment.setPlanRoot(node); - hjFragment = leftChildFragment; - } else { - hjFragment = createPartitionedIcebergDeleteFragment( - node, leftChildFragment, rightChildFragment, lhsJoinExprs, rhsJoinExprs); - } - return hjFragment; + // Doesn't create a new fragment, but modifies leftChildFragment to execute + // the join; the build input is provided by an ExchangeNode, which is the + // destination of the rightChildFragment's output + node.setChild(0, leftChildFragment.getPlanRoot()); + rightChildFragment.setOutputPartition(DataPartition.DIRECTED); + connectChildFragment(node, 1, leftChildFragment, rightChildFragment); + leftChildFragment.setPlanRoot(node); + return leftChildFragment; } /** diff --git a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java index e92fa906f..6a7600906 100644 --- a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java +++ b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java @@ -23,6 +23,7 @@ import org.apache.impala.analysis.SortInfo; import org.apache.impala.analysis.TupleId; import org.apache.impala.common.ImpalaException; import org.apache.impala.service.BackendConfig; +import org.apache.impala.thrift.TDataSinkType; import org.apache.impala.thrift.TExchangeNode; import org.apache.impala.thrift.TExplainLevel; import org.apache.impala.thrift.TPlanNode; @@ -71,6 +72,9 @@ public class ExchangeNode extends PlanNode { // If the output of the sink is not partitioned but the target fragment is // partitioned, then the data exchange is broadcast. Preconditions.checkState(!children_.isEmpty()); + // Has to examine isDirectedExchange() too because for a DIRECTED exchange the below + // code would also return true. + if (isDirectedExchange()) return false; DataSink sink = getChild(0).getFragment().getSink(); if (sink == null) return false; Preconditions.checkState(sink instanceof DataStreamSink); @@ -78,6 +82,27 @@ public class ExchangeNode extends PlanNode { return !streamSink.getOutputPartition().isPartitioned() && fragment_.isPartitioned(); } + protected boolean isDirectedExchange() { + if (fragment_.getSink().getSinkType() == TDataSinkType.ICEBERG_DELETE_BUILDER) { + // If this EXCHANGE is using a JoinBuildSink to send to an IcebergDeleteNode in a + // separate fragment. + return true; + } + // If this EXCHANGE is right below an IcebergDeleteNode in the same fragment. + return isChildOfIcebergDeleteNode(fragment_.getPlanRoot()); + } + + protected boolean isChildOfIcebergDeleteNode(PlanNode currNode) { + if (currNode instanceof IcebergDeleteNode) { + Preconditions.checkState(currNode.getChildCount() == 2); + if (currNode.getChild(1) == this) return true; + } + for (PlanNode child : currNode.getChildren()) { + if (isChildOfIcebergDeleteNode(child)) return true; + } + return false; + } + public ExchangeNode(PlanNodeId id, PlanNode input) { super(id, "EXCHANGE"); offset_ = 0; @@ -162,7 +187,9 @@ public class ExchangeNode extends PlanNode { Preconditions.checkState(!children_.isEmpty()); DataSink sink = getChild(0).getFragment().getSink(); if (sink == null) return ""; - if (isBroadcastExchange()) { + if (isDirectedExchange()) { + return "DIRECTED"; + } else if (isBroadcastExchange()) { return "BROADCAST"; } else { Preconditions.checkState(sink instanceof DataStreamSink); diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergDeleteNode.java b/fe/src/main/java/org/apache/impala/planner/IcebergDeleteNode.java index 11594d997..e4c131bdf 100644 --- a/fe/src/main/java/org/apache/impala/planner/IcebergDeleteNode.java +++ b/fe/src/main/java/org/apache/impala/planner/IcebergDeleteNode.java @@ -191,18 +191,11 @@ public class IcebergDeleteNode extends JoinNode { rhsNdv = PlanNode.checkedMultiply(rhsNdv, rhsPdNdv); } } + // The memory of the data stored in hash table is the file path of the data files // which have delete files and 8 byte for every deleted row position - int numberOfDataFilesWithDelete = 0; - if (distrMode_ == DistributionMode.PARTITIONED) { - numberOfDataFilesWithDelete = ((IcebergScanNode) getChild(0).getChild(0)) - .getFileDescriptorsWithLimit(null, false, -1) - .size(); - } else { - numberOfDataFilesWithDelete = ((IcebergScanNode) getChild(0)) - .getFileDescriptorsWithLimit(null, false, -1) - .size(); - } + int numberOfDataFilesWithDelete = ((IcebergScanNode) getChild(0)) + .getFileDescriptorsWithLimit(null, false, -1).size(); perBuildInstanceDataBytes = (long) Math.ceil( numberOfDataFilesWithDelete * getChild(1).getAvgRowSize() + 8 * rhsCard); diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java b/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java index c798c4160..10b5a5390 100644 --- a/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java @@ -39,6 +39,7 @@ import org.apache.impala.catalog.Type; import org.apache.impala.common.ImpalaRuntimeException; import org.apache.impala.fb.FbIcebergDataFileFormat; import org.apache.impala.thrift.TExplainLevel; +import org.apache.impala.thrift.TPlanNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,10 +64,24 @@ public class IcebergScanNode extends HdfsScanNode { // already applied them and they won't filter any further rows. private List<Expr> skippedConjuncts_; + // This member is set when this scan node is the left child of an IcebergDeleteNode or + // in other words when this scan node reads data files that have delete files + // associated. Holds the scan node ID of the right child of the IcebergDeleteNode + // responsible for reading the delete files of the corresponding table. + private final PlanNodeId deleteFileScanNodeId; + public IcebergScanNode(PlanNodeId id, TableRef tblRef, List<Expr> conjuncts, MultiAggregateInfo aggInfo, List<FileDescriptor> fileDescs, List<Expr> nonIdentityConjuncts, List<Expr> skippedConjuncts) throws ImpalaRuntimeException { + this(id, tblRef, conjuncts, aggInfo, fileDescs, nonIdentityConjuncts, + skippedConjuncts, null); + } + + public IcebergScanNode(PlanNodeId id, TableRef tblRef, List<Expr> conjuncts, + MultiAggregateInfo aggInfo, List<FileDescriptor> fileDescs, + List<Expr> nonIdentityConjuncts, List<Expr> skippedConjuncts, PlanNodeId deleteId) + throws ImpalaRuntimeException { super(id, tblRef.getDesc(), conjuncts, getIcebergPartition(((FeIcebergTable)tblRef.getTable()).getFeFsTable()), tblRef, aggInfo, null, false); @@ -96,6 +111,7 @@ public class IcebergScanNode extends HdfsScanNode { if (hasOrc) fileFormats_.add(HdfsFileFormat.ORC); if (hasAvro) fileFormats_.add(HdfsFileFormat.AVRO); this.skippedConjuncts_ = skippedConjuncts; + this.deleteFileScanNodeId = deleteId; } /** @@ -221,6 +237,15 @@ public class IcebergScanNode extends HdfsScanNode { return result; } + @Override + protected void toThrift(TPlanNode msg) { + super.toThrift(msg); + Preconditions.checkNotNull(msg.hdfs_scan_node); + if (deleteFileScanNodeId != null) { + msg.hdfs_scan_node.setDeleteFileScanNodeId(deleteFileScanNodeId.asInt()); + } + } + @Override protected String getDerivedExplainString( String indentPrefix, TExplainLevel detailLevel) { diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java index 4b26a5132..419adf207 100644 --- a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java @@ -252,7 +252,7 @@ public class IcebergScanPlanner { addDeletePositionSlots(deleteDeltaRef); IcebergScanNode dataScanNode = new IcebergScanNode( dataScanNodeId, tblRef_, conjuncts_, aggInfo_, dataFilesWithDeletes_, - nonIdentityConjuncts_, getSkippedConjuncts()); + nonIdentityConjuncts_, getSkippedConjuncts(), deleteScanNodeId); dataScanNode.init(analyzer_); IcebergScanNode deleteScanNode = new IcebergScanNode( deleteScanNodeId, diff --git a/fe/src/main/java/org/apache/impala/planner/JoinNode.java b/fe/src/main/java/org/apache/impala/planner/JoinNode.java index 4c5d51db3..ffdd788b6 100644 --- a/fe/src/main/java/org/apache/impala/planner/JoinNode.java +++ b/fe/src/main/java/org/apache/impala/planner/JoinNode.java @@ -107,7 +107,8 @@ public abstract class JoinNode extends PlanNode { public enum DistributionMode { NONE("NONE"), BROADCAST("BROADCAST"), - PARTITIONED("PARTITIONED"); + PARTITIONED("PARTITIONED"), + DIRECTED("DIRECTED"); private final String description_; @@ -117,9 +118,18 @@ public abstract class JoinNode extends PlanNode { @Override public String toString() { return description_; } + public static DistributionMode fromThrift(TJoinDistributionMode distrMode) { - if (distrMode == TJoinDistributionMode.BROADCAST) return BROADCAST; - return PARTITIONED; + switch (distrMode) { + case BROADCAST: + return BROADCAST; + case SHUFFLE: + return PARTITIONED; + case DIRECTED: + return DIRECTED; + default: + throw new RuntimeException("Invalid distribution mode: " + distrMode); + } } } @@ -205,7 +215,8 @@ public abstract class JoinNode extends PlanNode { // Returns true if we can share a join build between multiple consuming fragment // instances. public boolean canShareBuild() { - return distrMode_ == JoinNode.DistributionMode.BROADCAST; + return distrMode_ == JoinNode.DistributionMode.BROADCAST || + distrMode_ == DistributionMode.DIRECTED; } public JoinOperator getJoinOp() { return joinOp_; } @@ -991,11 +1002,4 @@ public abstract class JoinNode extends PlanNode { * Does not modify the state of this node. */ public abstract Pair<ProcessingCost, ProcessingCost> computeJoinProcessingCost(); - - /* Helper to return all predicates as a string. */ - public String getAllPredicatesAsString(TExplainLevel level) { - return "Conjuncts=" + Expr.getExplainString(getConjuncts(), level) - + ", EqJoinConjuncts=" + Expr.getExplainString(getEqJoinConjuncts(), level) - + ", EqJoinConjuncts=" + Expr.getExplainString(getOtherJoinConjuncts(), level); - } } diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-delete.test b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-delete.test index c0fcdc0b9..6eac5b341 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-delete.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-delete.test @@ -48,10 +48,10 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DE | order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST | row-size=20B cardinality=1 | -02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, BROADCAST] +02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED] | row-size=28B cardinality=1 | -|--03:EXCHANGE [BROADCAST] +|--03:EXCHANGE [DIRECTED] | | | 01:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-01 functional_parquet.iceberg_v2_delete_positional-position-delete] | HDFS partitions=1/1 files=1 size=1.54KB @@ -126,10 +126,10 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DE | | output: min(id) | | row-size=8B cardinality=1 | | -| 05:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, BROADCAST] +| 05:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED] | | row-size=28B cardinality=2 | | -| |--09:EXCHANGE [BROADCAST] +| |--09:EXCHANGE [DIRECTED] | | | | | 04:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-04 functional_parquet.iceberg_v2_delete_positional-position-delete] | | HDFS partitions=1/1 files=1 size=1.54KB @@ -139,10 +139,10 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DE | HDFS partitions=1/1 files=1 size=662B | row-size=28B cardinality=3 | -02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, BROADCAST] +02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED] | row-size=28B cardinality=2 | -|--08:EXCHANGE [BROADCAST] +|--08:EXCHANGE [DIRECTED] | | | 01:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-01 functional_parquet.iceberg_v2_delete_positional-position-delete] | HDFS partitions=1/1 files=1 size=1.54KB @@ -179,10 +179,10 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DE | order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST | row-size=20B cardinality=1 | -02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, BROADCAST] +02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED] | row-size=28B cardinality=1 | -|--03:EXCHANGE [BROADCAST] +|--03:EXCHANGE [DIRECTED] | | | 01:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-01 functional_parquet.iceberg_v2_delete_positional-position-delete] | HDFS partitions=1/1 files=1 size=1.54KB @@ -215,23 +215,21 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes- ---- DISTRIBUTEDPLAN DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE] | -06:SORT +05:SORT | order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST | row-size=36B cardinality=1 | -05:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.PARTITION__SPEC__ID,functional_parquet.iceberg_v2_partitioned_position_deletes.ICEBERG__PARTITION__SERIALIZED)] +04:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.PARTITION__SPEC__ID,functional_parquet.iceberg_v2_partitioned_position_deletes.ICEBERG__PARTITION__SERIALIZED)] | -02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, PARTITIONED] +02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED] | row-size=40B cardinality=1 | -|--04:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete.file_path)] +|--03:EXCHANGE [DIRECTED] | | | 01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete] | HDFS partitions=1/1 files=3 size=9.47KB | row-size=204B cardinality=10 | -03:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.input__file__name)] -| 00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes] HDFS partitions=1/1 files=3 size=3.48KB predicates: id = 20 @@ -263,10 +261,10 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes- | order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST | row-size=36B cardinality=3 | -02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, BROADCAST] +02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED] | row-size=36B cardinality=3 | -|--03:EXCHANGE [BROADCAST] +|--03:EXCHANGE [DIRECTED] | | | 01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete] | HDFS partitions=1/1 files=1 size=3.15KB @@ -299,23 +297,21 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes- ---- DISTRIBUTEDPLAN DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE] | -06:SORT +05:SORT | order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST | row-size=36B cardinality=1 | -05:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.PARTITION__SPEC__ID,functional_parquet.iceberg_v2_partitioned_position_deletes.ICEBERG__PARTITION__SERIALIZED)] +04:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.PARTITION__SPEC__ID,functional_parquet.iceberg_v2_partitioned_position_deletes.ICEBERG__PARTITION__SERIALIZED)] | -02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, PARTITIONED] +02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED] | row-size=48B cardinality=1 | -|--04:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete.file_path)] +|--03:EXCHANGE [DIRECTED] | | | 01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete] | HDFS partitions=1/1 files=3 size=9.47KB | row-size=204B cardinality=10 | -03:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.input__file__name)] -| 00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes] HDFS partitions=1/1 files=3 size=3.48KB predicates: `user` LIKE 'A%' @@ -364,33 +360,33 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes- ---- DISTRIBUTEDPLAN DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE] | -15:SORT +14:SORT | order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST | row-size=36B cardinality=10 | -14:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.PARTITION__SPEC__ID,functional_parquet.iceberg_v2_partitioned_position_deletes.ICEBERG__PARTITION__SERIALIZED)] +13:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.PARTITION__SPEC__ID,functional_parquet.iceberg_v2_partitioned_position_deletes.ICEBERG__PARTITION__SERIALIZED)] | 07:HASH JOIN [LEFT SEMI JOIN, BROADCAST] | hash predicates: id = max(id) | runtime filters: RF000 <- max(id) | row-size=40B cardinality=10 | -|--13:EXCHANGE [BROADCAST] +|--12:EXCHANGE [BROADCAST] | | -| 12:AGGREGATE [FINALIZE] +| 11:AGGREGATE [FINALIZE] | | output: max:merge(id) | | row-size=8B cardinality=1 | | -| 11:EXCHANGE [UNPARTITIONED] +| 10:EXCHANGE [UNPARTITIONED] | | | 06:AGGREGATE | | output: max(id) | | row-size=8B cardinality=1 | | -| 05:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, BROADCAST] +| 05:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED] | | row-size=28B cardinality=2 | | -| |--10:EXCHANGE [BROADCAST] +| |--09:EXCHANGE [DIRECTED] | | | | | 04:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-04 functional_parquet.iceberg_v2_delete_positional-position-delete] | | HDFS partitions=1/1 files=1 size=1.54KB @@ -400,17 +396,15 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes- | HDFS partitions=1/1 files=1 size=662B | row-size=28B cardinality=3 | -02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, PARTITIONED] +02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED] | row-size=40B cardinality=10 | -|--09:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete.file_path)] +|--08:EXCHANGE [DIRECTED] | | | 01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete] | HDFS partitions=1/1 files=3 size=9.47KB | row-size=204B cardinality=10 | -08:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.input__file__name)] -| 00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes] HDFS partitions=1/1 files=3 size=3.48KB runtime filters: RF000 -> id @@ -438,23 +432,21 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes- ---- DISTRIBUTEDPLAN DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE] | -06:SORT +05:SORT | order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST | row-size=36B cardinality=1 | -05:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.PARTITION__SPEC__ID,functional_parquet.iceberg_v2_partitioned_position_deletes.ICEBERG__PARTITION__SERIALIZED)] +04:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.PARTITION__SPEC__ID,functional_parquet.iceberg_v2_partitioned_position_deletes.ICEBERG__PARTITION__SERIALIZED)] | -02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, PARTITIONED] +02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED] | row-size=40B cardinality=1 | -|--04:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete.file_path)] +|--03:EXCHANGE [DIRECTED] | | | 01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete] | HDFS partitions=1/1 files=3 size=9.47KB | row-size=204B cardinality=10 | -03:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.input__file__name)] -| 00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes] HDFS partitions=1/1 files=3 size=3.48KB predicates: FILE__POSITION = id diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables.test b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables.test index 00360ecdb..f5a3c9d87 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables.test @@ -73,10 +73,10 @@ PLAN-ROOT SINK | output: count(*) | row-size=8B cardinality=1 | -02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, BROADCAST] +02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED] | row-size=20B cardinality=2 | -|--04:EXCHANGE [BROADCAST] +|--04:EXCHANGE [DIRECTED] | | | 01:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-01 functional_parquet.iceberg_v2_delete_positional-position-delete] | HDFS partitions=1/1 files=1 size=1.54KB @@ -105,10 +105,10 @@ PLAN-ROOT SINK | 04:EXCHANGE [UNPARTITIONED] | -02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, BROADCAST] +02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED] | row-size=40B cardinality=2 | -|--03:EXCHANGE [BROADCAST] +|--03:EXCHANGE [DIRECTED] | | | 01:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-01 functional_parquet.iceberg_v2_delete_positional-position-delete] | HDFS partitions=1/1 files=1 size=1.54KB @@ -137,10 +137,10 @@ PLAN-ROOT SINK | 04:EXCHANGE [UNPARTITIONED] | -02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, BROADCAST] +02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED] | row-size=36B cardinality=1 | -|--03:EXCHANGE [BROADCAST] +|--03:EXCHANGE [DIRECTED] | | | 01:SCAN HDFS [functional_parquet.iceberg_v2_positional_delete_all_rows-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_delete_all_rows-position-delete] | HDFS partitions=1/1 files=1 size=2.60KB @@ -190,11 +190,11 @@ PLAN-ROOT SINK 04:EXCHANGE [UNPARTITIONED] | limit: 1 | -02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, BROADCAST] +02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED] | limit: 1 | row-size=36B cardinality=1 | -|--03:EXCHANGE [BROADCAST] +|--03:EXCHANGE [DIRECTED] | | | 01:SCAN HDFS [functional_parquet.iceberg_v2_positional_delete_all_rows-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_delete_all_rows-position-delete] | HDFS partitions=1/1 files=1 size=2.60KB @@ -230,7 +230,7 @@ PLAN-ROOT SINK ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | -07:EXCHANGE [UNPARTITIONED] +06:EXCHANGE [UNPARTITIONED] | limit: 1 | 04:UNION @@ -238,17 +238,15 @@ PLAN-ROOT SINK | limit: 1 | row-size=36B cardinality=1 | -|--02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, PARTITIONED] +|--02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED] | | row-size=36B cardinality=2 | | -| |--06:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path)] +| |--05:EXCHANGE [DIRECTED] | | | | | 01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete] | | HDFS partitions=1/1 files=2 size=5.33KB | | row-size=267B cardinality=4 | | -| 05:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.input__file__name)] -| | | 00:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files] | HDFS partitions=1/1 files=2 size=1.22KB | row-size=36B cardinality=6 @@ -282,23 +280,21 @@ PLAN-ROOT SINK ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | -07:EXCHANGE [UNPARTITIONED] +06:EXCHANGE [UNPARTITIONED] | 04:UNION | pass-through-operands: all | row-size=36B cardinality=6 | -|--02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, PARTITIONED] +|--02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED] | | row-size=36B cardinality=2 | | -| |--06:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path)] +| |--05:EXCHANGE [DIRECTED] | | | | | 01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete] | | HDFS partitions=1/1 files=2 size=5.33KB | | row-size=267B cardinality=4 | | -| 05:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.input__file__name)] -| | | 00:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files] | HDFS partitions=1/1 files=2 size=1.22KB | row-size=36B cardinality=6 @@ -352,10 +348,10 @@ PLAN-ROOT SINK | output: count(*) | row-size=8B cardinality=1 | -02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, BROADCAST] +02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED] | row-size=20B cardinality=2 | -|--04:EXCHANGE [BROADCAST] +|--04:EXCHANGE [DIRECTED] | | | 01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete] | HDFS partitions=1/1 files=1 size=2.63KB @@ -386,27 +382,25 @@ PLAN-ROOT SINK ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | -07:AGGREGATE [FINALIZE] +06:AGGREGATE [FINALIZE] | output: count:merge(*) | row-size=8B cardinality=1 | -06:EXCHANGE [UNPARTITIONED] +05:EXCHANGE [UNPARTITIONED] | 03:AGGREGATE | output: count(*) | row-size=8B cardinality=1 | -02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, PARTITIONED] +02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED] | row-size=20B cardinality=2 | -|--05:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path)] +|--04:EXCHANGE [DIRECTED] | | | 01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete] | HDFS partitions=1/1 files=2 size=5.33KB | row-size=267B cardinality=4 | -04:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.input__file__name)] -| 00:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files] HDFS partitions=1/1 files=2 size=1.22KB row-size=20B cardinality=6 @@ -442,10 +436,10 @@ PLAN-ROOT SINK | pass-through-operands: all | row-size=36B cardinality=4 | -|--02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, BROADCAST] +|--02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED] | | row-size=36B cardinality=1 | | -| |--05:EXCHANGE [BROADCAST] +| |--05:EXCHANGE [DIRECTED] | | | | | 01:SCAN HDFS [functional_parquet.iceberg_v2_positional_update_all_rows-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_update_all_rows-position-delete] | | HDFS partitions=1/1 files=1 size=2.60KB @@ -476,19 +470,17 @@ PLAN-ROOT SINK ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | -05:EXCHANGE [UNPARTITIONED] +04:EXCHANGE [UNPARTITIONED] | -02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, PARTITIONED] +02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED] | row-size=64B cardinality=10 | -|--04:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete.file_path)] +|--03:EXCHANGE [DIRECTED] | | | 01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete] | HDFS partitions=1/1 files=3 size=9.47KB | row-size=204B cardinality=10 | -03:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.input__file__name)] -| 00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes] HDFS partitions=1/1 files=3 size=3.48KB row-size=64B cardinality=20 @@ -521,23 +513,21 @@ PLAN-ROOT SINK ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | -07:EXCHANGE [UNPARTITIONED] +06:EXCHANGE [UNPARTITIONED] | 04:UNION | pass-through-operands: all | row-size=36B cardinality=2 | -|--02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, PARTITIONED] +|--02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED] | | row-size=36B cardinality=1 | | -| |--06:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path)] +| |--05:EXCHANGE [DIRECTED] | | | | | 01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete] | | HDFS partitions=1/1 files=2 size=5.33KB | | row-size=267B cardinality=4 | | -| 05:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.input__file__name)] -| | | 00:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files] | HDFS partitions=1/1 files=2 size=1.22KB | predicates: i > 2 @@ -587,23 +577,23 @@ PLAN-ROOT SINK ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | -13:EXCHANGE [UNPARTITIONED] +12:EXCHANGE [UNPARTITIONED] | 07:HASH JOIN [LEFT ANTI JOIN, PARTITIONED] | hash predicates: functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.i IS NOT DISTINCT FROM functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.i, functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.s IS NOT DISTINCT FROM functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.s | row-size=16B cardinality=6 | -|--12:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.i,functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.s)] +|--11:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.i,functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.s)] | | | 06:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files] | HDFS partitions=1/1 files=1 size=625B | row-size=16B cardinality=3 | -11:AGGREGATE [FINALIZE] +10:AGGREGATE [FINALIZE] | group by: functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.i, functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.s | row-size=16B cardinality=6 | -10:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.i,functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.s)] +09:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.i,functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.s)] | 05:AGGREGATE [STREAMING] | group by: functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.i, functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.s @@ -613,17 +603,15 @@ PLAN-ROOT SINK | pass-through-operands: all | row-size=36B cardinality=6 | -|--02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, PARTITIONED] +|--02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED] | | row-size=36B cardinality=2 | | -| |--09:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path)] +| |--08:EXCHANGE [DIRECTED] | | | | | 01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete] | | HDFS partitions=1/1 files=2 size=5.33KB | | row-size=267B cardinality=4 | | -| 08:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.input__file__name)] -| | | 00:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files] | HDFS partitions=1/1 files=2 size=1.22KB | row-size=36B cardinality=6 @@ -660,23 +648,21 @@ PLAN-ROOT SINK ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | -07:EXCHANGE [UNPARTITIONED] +06:EXCHANGE [UNPARTITIONED] | 04:UNION | pass-through-operands: all | row-size=36B cardinality=2 | -|--02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, PARTITIONED] +|--02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED] | | row-size=36B cardinality=1 | | -| |--06:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path)] +| |--05:EXCHANGE [DIRECTED] | | | | | 01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete] | | HDFS partitions=1/1 files=2 size=5.33KB | | row-size=267B cardinality=4 | | -| 05:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.input__file__name)] -| | | 00:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files] | HDFS partitions=1/1 files=2 size=1.22KB | predicates: i + 1000 > 1003 @@ -744,20 +730,20 @@ PLAN-ROOT SINK ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | -18:EXCHANGE [UNPARTITIONED] +17:EXCHANGE [UNPARTITIONED] | 11:HASH JOIN [LEFT SEMI JOIN, BROADCAST] | hash predicates: i = max(i) | runtime filters: RF000 <- max(i) | row-size=36B cardinality=6 | -|--17:EXCHANGE [BROADCAST] +|--16:EXCHANGE [BROADCAST] | | -| 16:AGGREGATE [FINALIZE] +| 15:AGGREGATE [FINALIZE] | | output: max:merge(i) | | row-size=4B cardinality=1 | | -| 15:EXCHANGE [UNPARTITIONED] +| 14:EXCHANGE [UNPARTITIONED] | | | 10:AGGREGATE | | output: max(i) @@ -767,10 +753,10 @@ PLAN-ROOT SINK | | pass-through-operands: all | | row-size=24B cardinality=4 | | -| |--07:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, BROADCAST] +| |--07:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED] | | | row-size=24B cardinality=1 | | | -| | |--14:EXCHANGE [BROADCAST] +| | |--13:EXCHANGE [DIRECTED] | | | | | | | 06:SCAN HDFS [functional_parquet.iceberg_v2_positional_update_all_rows-POSITION-DELETE-06 functional_parquet.iceberg_v2_positional_update_all_rows-position-delete] | | | HDFS partitions=1/1 files=1 size=2.60KB @@ -788,17 +774,15 @@ PLAN-ROOT SINK | pass-through-operands: all | row-size=36B cardinality=6 | -|--02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, PARTITIONED] +|--02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED] | | row-size=36B cardinality=2 | | -| |--13:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path)] +| |--12:EXCHANGE [DIRECTED] | | | | | 01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete] | | HDFS partitions=1/1 files=2 size=5.33KB | | row-size=267B cardinality=4 | | -| 12:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.input__file__name)] -| | | 00:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files] | HDFS partitions=1/1 files=2 size=1.22KB | runtime filters: RF000 -> i @@ -830,10 +814,10 @@ PLAN-ROOT SINK | 04:EXCHANGE [UNPARTITIONED] | -02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, BROADCAST] +02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED] | row-size=64B cardinality=1 | -|--03:EXCHANGE [BROADCAST] +|--03:EXCHANGE [DIRECTED] | | | 01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete] | HDFS partitions=1/1 files=1 size=3.18KB @@ -867,10 +851,10 @@ PLAN-ROOT SINK | 04:EXCHANGE [UNPARTITIONED] | -02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, BROADCAST] +02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED] | row-size=64B cardinality=1 | -|--03:EXCHANGE [BROADCAST] +|--03:EXCHANGE [DIRECTED] | | | 01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete] | HDFS partitions=1/1 files=1 size=3.18KB @@ -1042,10 +1026,10 @@ PLAN-ROOT SINK | 04:EXCHANGE [UNPARTITIONED] | -02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, BROADCAST] +02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED] | row-size=64B cardinality=4 | -|--03:EXCHANGE [BROADCAST] +|--03:EXCHANGE [DIRECTED] | | | 01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete] | HDFS partitions=1/1 files=1 size=3.18KB @@ -1107,29 +1091,27 @@ PLAN-ROOT SINK 10:NESTED LOOP JOIN [CROSS JOIN, BROADCAST] | row-size=17B cardinality=1 | -|--20:EXCHANGE [UNPARTITIONED] +|--18:EXCHANGE [UNPARTITIONED] | | -| 19:AGGREGATE [FINALIZE] +| 17:AGGREGATE [FINALIZE] | | output: count:merge(*) | | row-size=8B cardinality=1 | | -| 18:EXCHANGE [UNPARTITIONED] +| 16:EXCHANGE [UNPARTITIONED] | | | 08:AGGREGATE | | output: count(*) | | row-size=8B cardinality=1 | | -| 07:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, PARTITIONED] +| 07:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED] | | row-size=20B cardinality=2 | | -| |--17:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path)] +| |--15:EXCHANGE [DIRECTED] | | | | | 06:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-06 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete] | | HDFS partitions=1/1 files=2 size=5.33KB | | row-size=267B cardinality=4 | | -| 16:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.input__file__name)] -| | | 05:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files] | HDFS partitions=1/1 files=2 size=1.22KB | row-size=20B cardinality=6 @@ -1137,33 +1119,31 @@ PLAN-ROOT SINK 09:NESTED LOOP JOIN [CROSS JOIN, BROADCAST] | row-size=9B cardinality=1 | -|--15:EXCHANGE [UNPARTITIONED] +|--14:EXCHANGE [UNPARTITIONED] | | | 04:UNION | constant-operands=1 | row-size=1B cardinality=1 | -14:AGGREGATE [FINALIZE] +13:AGGREGATE [FINALIZE] | output: count:merge(*) | row-size=8B cardinality=1 | -13:EXCHANGE [UNPARTITIONED] +12:EXCHANGE [UNPARTITIONED] | 03:AGGREGATE | output: count(*) | row-size=8B cardinality=1 | -02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, PARTITIONED] +02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED] | row-size=20B cardinality=2 | -|--12:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path)] +|--11:EXCHANGE [DIRECTED] | | | 01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete] | HDFS partitions=1/1 files=2 size=5.33KB | row-size=267B cardinality=4 | -11:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.input__file__name)] -| 00:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files] HDFS partitions=1/1 files=2 size=1.22KB row-size=20B cardinality=6 diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-directed-mode.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-directed-mode.test new file mode 100644 index 000000000..67f0a2c1f --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-directed-mode.test @@ -0,0 +1,28 @@ +==== +---- QUERY +# Files are going to be split up to multiple scan ranges, 10 files into 30 ranges. See if +# directed distribution mode still gives a good result if the Exchange node sends out one +# row to multiple hosts. No separate JOIN BUILD fragment is used. +set MAX_SCAN_RANGE_LENGTH=512b; +select count(1) from iceberg_v2_partitioned_position_deletes; +---- RESULTS +10 +---- TYPES +bigint +---- RUNTIME_PROFILE +aggregation(SUM, BuildRows): 30 +row_regex: (?!.*F03:JOIN BUILD.*) +==== +---- QUERY +# Same as above but here there is a separate fragment for the join build. +set MT_DOP=4; +set MAX_SCAN_RANGE_LENGTH=512b; +select count(1) from iceberg_v2_partitioned_position_deletes; +---- RESULTS +10 +---- TYPES +bigint +---- RUNTIME_PROFILE +aggregation(SUM, BuildRows): 30 +row_regex: .*F03:JOIN BUILD.* +==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test index 6af973fdb..6a7cb599b 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test @@ -323,12 +323,16 @@ aggregation(SUM, NumRowGroups): 0 aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY +# Checks that each delete row is only sent once to the build side of the join because +# it's either a directed Iceberg join (when V2 opt is ON) or because it's a partitioned +# join (when V2 opt is OFF). SELECT count(*) from iceberg_v2_partitioned_position_deletes ---- RESULTS 10 ---- TYPES bigint ---- RUNTIME_PROFILE +aggregation(SUM, BuildRows): 10 aggregation(SUM, NumRowGroups): 6 aggregation(SUM, NumFileMetadataRead): 0 ==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/set.test b/testdata/workloads/functional-query/queries/QueryTest/set.test index a555a6b04..c0e0ad67c 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/set.test +++ b/testdata/workloads/functional-query/queries/QueryTest/set.test @@ -135,6 +135,11 @@ set default_join_distribution_mode=bar Invalid default join distribution mode: 'bar'. Valid values are BROADCAST(0), SHUFFLE(1). ==== ---- QUERY +set default_join_distribution_mode=directed +---- CATCH +Invalid default join distribution mode: 'directed'. Valid values are BROADCAST(0), SHUFFLE(1). +==== +---- QUERY set kudu_read_mode=bar ---- CATCH Invalid Kudu read mode: 'bar'. Valid values are DEFAULT(0), READ_LATEST(1), READ_AT_SNAPSHOT(2). diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py index 1ac27294b..58b3ad14c 100644 --- a/tests/query_test/test_iceberg.py +++ b/tests/query_test/test_iceberg.py @@ -1343,3 +1343,25 @@ class TestIcebergV2Table(IcebergTestSuite): assert result_after_opt.data.sort() == result_time_travel.data.sort() self.run_test_case('QueryTest/iceberg-optimize', vector, unique_database) + + +# Tests to exercise the DIRECTED distribution mode for V2 Iceberg tables. Note, that most +# of the test coverage is in TestIcebergV2Table.test_read_position_deletes but since it +# runs also with the V2 optimizations setting turned off, some tests were moved here. +class TestIcebergDirectedMode(IcebergTestSuite): + """Tests related to Iceberg DIRECTED distribution mode.""" + + @classmethod + def get_workload(cls): + return 'functional-query' + + @classmethod + def add_test_dimensions(cls): + super(TestIcebergDirectedMode, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_constraint( + lambda v: v.get_value('table_format').file_format == 'parquet') + + @SkipIfDockerizedCluster.internal_hostname + @SkipIf.hardcoded_uris + def test_directed_mode(self, vector): + self.run_test_case('QueryTest/iceberg-v2-directed-mode', vector)
