http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/old-hash-table.h ---------------------------------------------------------------------- diff --git a/be/src/exec/old-hash-table.h b/be/src/exec/old-hash-table.h deleted file mode 100644 index 406f360..0000000 --- a/be/src/exec/old-hash-table.h +++ /dev/null @@ -1,548 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - - -#ifndef IMPALA_EXEC_OLD_HASH_TABLE_H -#define IMPALA_EXEC_OLD_HASH_TABLE_H - -#include <vector> -#include <boost/cstdint.hpp> -#include <boost/scoped_ptr.hpp> -#include "codegen/impala-ir.h" -#include "common/logging.h" -#include "runtime/mem-pool.h" -#include "util/hash-util.h" -#include "util/runtime-profile.h" - -namespace llvm { - class Function; -} - -namespace impala { - -class LlvmCodeGen; -class MemTracker; -class RuntimeFilter; -class RowDescriptor; -class RuntimeState; -class ScalarExpr; -class ScalarExprEvaluator; -class Tuple; -class TupleRow; - -/// TODO: Temporarily moving the old HashTable implementation to make it work with the -/// non-partitioned versions of HJ and AGG. It should be removed once we remove those -/// non-partitioned versions. - -/// Hash table implementation designed for hash aggregation and hash joins. This is not -/// templatized and is tailored to the usage pattern for aggregation and joins. The -/// hash table store TupleRows and allows for different exprs for insertions and finds. -/// This is the pattern we use for joins and aggregation where the input/build tuple -/// row descriptor is different from the find/probe descriptor. -/// The table is optimized for the query engine's use case as much as possible and is not -/// intended to be a generic hash table implementation. The API loosely mimics the -/// std::hashset API. -// -/// The hash table stores evaluated expr results for the current row being processed -/// when possible into a contiguous memory buffer. This allows for very efficient -/// computation for hashing. The implementation is also designed to allow codegen -/// for some paths. -// -/// The hash table does not support removes. The hash table is not thread safe. -// -/// The implementation is based on the boost multiset. The hashtable is implemented by -/// two data structures: a vector of buckets and a vector of nodes. Inserted values -/// are stored as nodes (in the order they are inserted). The buckets (indexed by the -/// mod of the hash) contain pointers to the node vector. Nodes that fall in the same -/// bucket are linked together (the bucket pointer gets you the head of that linked list). -/// When growing the hash table, the number of buckets is doubled, and nodes from a -/// particular bucket either stay in place or move to an analogous bucket in the second -/// half of buckets. This behavior allows us to avoid moving about half the nodes each -/// time, and maintains good cache properties by only accessing 2 buckets at a time. -/// The node vector is modified in place. -/// Due to the doubling nature of the buckets, we require that the number of buckets is a -/// power of 2. This allows us to determine if a node needs to move by simply checking a -/// single bit, and further allows us to initially hash nodes using a bitmask. -// -/// TODO: this is not a fancy hash table in terms of memory access patterns (cuckoo-hashing -/// or something that spills to disk). We will likely want to invest more time into this. -/// TODO: hash-join and aggregation have very different access patterns. Joins insert -/// all the rows and then calls scan to find them. Aggregation interleaves Find() and -/// Inserts(). We can want to optimize joins more heavily for Inserts() (in particular -/// growing). -/// TODO: batched interface for inserts and finds. -class OldHashTable { - private: - struct Node; - - public: - class Iterator; - - /// Create a hash table. - /// - build_exprs are the exprs that should be used to evaluate rows during Insert(). - /// - probe_exprs are used during Find() - /// - filter_exprs are used to build runtime filters. - /// - num_build_tuples: number of Tuples in the build tuple row - /// - stores_nulls: if false, TupleRows with nulls are ignored during Insert - /// - finds_nulls: if finds_nulls[i] is false, Find() returns End() for TupleRows with - /// nulls in position i even if stores_nulls is true. - /// - num_buckets: number of buckets that the hash table should be initialized to - /// - mem_tracker: if non-empty, all memory allocations for nodes and for buckets are - /// tracked; the tracker must be valid until the d'tor is called - /// - initial_seed: Initial seed value to use when computing hashes for rows - /// - stores_tuples: If true, the hash table stores tuples, otherwise it stores tuple - /// rows. - /// TODO: stores_nulls is too coarse: for a hash table in which some columns are joined - /// with '<=>' and others with '=', stores_nulls could distinguish between columns - /// in which nulls are stored and columns in which they are not, which could save - /// space by not storing some rows we know will never match. - static Status Create(ObjectPool* pool, RuntimeState* state, - const std::vector<ScalarExpr*>& build_exprs, - const std::vector<ScalarExpr*>& probe_exprs, - const std::vector<ScalarExpr*>& filter_exprs, - int num_build_tuples, bool stores_nulls, - const std::vector<bool>& finds_nulls, int32_t initial_seed, - MemTracker* mem_tracker, const std::vector<RuntimeFilter*>& runtime_filters, - boost::scoped_ptr<OldHashTable>* hash_tbl_, bool stores_tuples = false, - int64_t num_buckets = 1024); - - /// Initializes the evaluators for build, probe and filter expressions. - Status Open(RuntimeState* state); - - /// Call to cleanup any resources. Must be called once. - void Close(RuntimeState* state); - - /// Frees local allocations made by expression evaluators. - void FreeLocalAllocations(); - - /// Insert row into the hash table. Row will be evaluated over build exprs. - /// This will grow the hash table if necessary. - /// If the hash table has or needs to go over the mem limit, the Insert will - /// be ignored. The caller is assumed to periodically (e.g. per row batch) check - /// the limits to identify this case. - /// The 'row' is not copied by the hash table and the caller must guarantee it - /// stays in memory. - /// Returns false if there was not enough memory to insert the row. - bool IR_ALWAYS_INLINE Insert(TupleRow* row) { - if (UNLIKELY(mem_limit_exceeded_)) return false; - bool has_null = EvalBuildRow(row); - if (!stores_nulls_ && has_null) return true; - - if (UNLIKELY(num_filled_buckets_ > num_buckets_till_resize_)) { - /// TODO: next prime instead of double? - ResizeBuckets(num_buckets_ * 2); - if (UNLIKELY(mem_limit_exceeded_)) return false; - } - return InsertImpl(row); - } - - bool IR_ALWAYS_INLINE Insert(Tuple* tuple) { - if (UNLIKELY(mem_limit_exceeded_)) return false; - bool has_null = EvalBuildRow(reinterpret_cast<TupleRow*>(&tuple)); - if (!stores_nulls_ && has_null) return true; - - if (UNLIKELY(num_filled_buckets_ > num_buckets_till_resize_)) { - /// TODO: next prime instead of double? - ResizeBuckets(num_buckets_ * 2); - if (UNLIKELY(mem_limit_exceeded_)) return false; - } - return InsertImpl(tuple); - } - - /// Evaluate and hash the build/probe row, returning in *hash. Returns false if this - /// row should be rejected (doesn't need to be processed further) because it - /// contains NULL. - /// These need to be inlined in the IR module so we can find and replace the calls to - /// EvalBuildRow()/EvalProbeRow(). - bool IR_ALWAYS_INLINE EvalAndHashBuild(TupleRow* row, uint32_t* hash); - bool IR_ALWAYS_INLINE EvalAndHashProbe(TupleRow* row, uint32_t* hash); - - /// Returns the start iterator for all rows that match 'probe_row'. 'probe_row' is - /// evaluated with probe exprs. The iterator can be iterated until OldHashTable::End() - /// to find all the matching rows. - /// Only one scan can be in progress at any time (i.e. it is not legal to call - /// Find(), begin iterating through all the matches, call another Find(), - /// and continuing iterator from the first scan iterator). - /// Advancing the returned iterator will go to the next matching row. The matching - /// rows are evaluated lazily (i.e. computed as the Iterator is moved). - /// Returns OldHashTable::End() if there is no match. - Iterator IR_ALWAYS_INLINE Find(TupleRow* probe_row); - - /// Returns number of elements in the hash table - int64_t size() const { return num_nodes_; } - - /// Returns the number of buckets - int64_t num_buckets() const { return buckets_.size(); } - - /// Returns the load factor (the number of non-empty buckets) - float load_factor() const { - return num_filled_buckets_ / static_cast<float>(buckets_.size()); - } - - /// Returns an estimate of the number of bytes needed to build the hash table - /// structure for 'num_rows'. - static int64_t EstimateSize(int64_t num_rows) { - // Assume 50% fill factor. - int64_t num_buckets = num_rows * 2; - return num_buckets * sizeof(Bucket) + num_rows * sizeof(Node); - } - - /// Returns the number of bytes allocated to the hash table - int64_t byte_size() const { - int64_t nodes_mem = (num_nodes_ + node_remaining_current_page_) * sizeof(Node); - return nodes_mem + sizeof(Bucket) * buckets_.capacity(); - } - - bool mem_limit_exceeded() const { return mem_limit_exceeded_; } - - /// Returns the results of the exprs at 'expr_idx' evaluated over the last row - /// processed by the OldHashTable. - /// This value is invalid if the expr evaluated to NULL. - /// TODO: this is an awkward abstraction but aggregation node can take advantage of - /// it and save some expr evaluation calls. - void* last_expr_value(int expr_idx) const { - return expr_values_buffer_ + expr_values_buffer_offsets_[expr_idx]; - } - - /// Returns if the expr at 'expr_idx' evaluated to NULL for the last row. - bool last_expr_value_null(int expr_idx) const { - return expr_value_null_bits_[expr_idx]; - } - - /// Can be called after all insert calls to generate runtime filters, which are then - /// published to the local runtime state's RuntimeFilterBank. Returns the number of - /// filters that are enabled. - int AddBloomFilters(); - - /// Returns an iterator at the beginning of the hash table. Advancing this iterator - /// will traverse all elements. - Iterator Begin(); - - /// Return an iterator pointing to the first element in the hash table that does not - /// have its matched flag set. Used in right-outer and full-outer joins. - Iterator FirstUnmatched(); - - /// Returns end marker - Iterator End() { return Iterator(); } - - /// Codegen for evaluating a tuple row. Codegen'd function matches the signature - /// for EvalBuildRow and EvalTupleRow. - /// if build_row is true, the codegen uses the build_exprs, otherwise the probe_exprs - llvm::Function* CodegenEvalTupleRow(LlvmCodeGen* codegen, bool build_row); - - /// Codegen for hashing the expr values in 'expr_values_buffer_'. Function - /// prototype matches HashCurrentRow identically. - llvm::Function* CodegenHashCurrentRow(LlvmCodeGen* codegen); - - /// Codegen for evaluating a TupleRow and comparing equality against - /// 'expr_values_buffer_'. Function signature matches OldHashTable::Equals() - llvm::Function* CodegenEquals(LlvmCodeGen* codegen); - - static const char* LLVM_CLASS_NAME; - - /// Dump out the entire hash table to string. If skip_empty, empty buckets are - /// skipped. If show_match, it also prints the matched flag of each node. If build_desc - /// is non-null, the build rows will be output. Otherwise just the build row addresses. - std::string DebugString(bool skip_empty, bool show_match, - const RowDescriptor* build_desc); - - /// stl-like iterator interface. - class Iterator { - public: - Iterator() : table_(NULL), bucket_idx_(-1), node_(NULL) { - } - - /// Iterates to the next element. In the case where the iterator was - /// from a Find, this will lazily evaluate that bucket, only returning - /// TupleRows that match the current scan row. No-op if the iterator is at the end. - template<bool check_match> - void IR_ALWAYS_INLINE Next(); - - /// Iterates to the next element that does not have its matched flag set. Returns false - /// if it reaches the end of the table without finding an unmatched element. Used in - /// right-outer and full-outer joins. - bool NextUnmatched(); - - /// Returns the current row. Callers must check the iterator is not AtEnd() before - /// calling GetRow(). - TupleRow* GetRow() { - DCHECK(!AtEnd()); - DCHECK(!table_->stores_tuples_); - return reinterpret_cast<TupleRow*>(node_->data); - } - - Tuple* GetTuple() { - DCHECK(!AtEnd()); - DCHECK(table_->stores_tuples_); - return reinterpret_cast<Tuple*>(node_->data); - } - - void set_matched(bool v) { - DCHECK(!AtEnd()); - node_->matched = v; - } - - bool matched() const { - DCHECK(!AtEnd()); - return node_->matched; - } - - void reset() { - bucket_idx_ = -1; - node_ = NULL; - } - - /// Returns true if this iterator is at the end, i.e. GetRow() cannot be called. - bool AtEnd() const { return node_ == NULL; } - bool operator!=(const Iterator& rhs) { return !(*this == rhs); } - - bool operator==(const Iterator& rhs) { - return bucket_idx_ == rhs.bucket_idx_ && node_ == rhs.node_; - } - - private: - friend class OldHashTable; - - Iterator(OldHashTable* table, int bucket_idx, Node* node, uint32_t hash) : - table_(table), - bucket_idx_(bucket_idx), - node_(node), - scan_hash_(hash) { - } - - OldHashTable* table_; - - /// Current bucket idx - int64_t bucket_idx_; - - /// Current node idx (within current bucket) - Node* node_; - - /// Cached hash value for the row passed to Find() - uint32_t scan_hash_; - }; - - private: - friend class Iterator; - friend class OldHashTableTest; - - /// TODO: bit pack this struct. The default alignment makes this struct have a lot - /// of wasted bytes. - struct Node { - /// Only used for full/right outer hash join to indicate if this row has been - /// matched. - /// From an abstraction point of view, this is an awkward place to store this - /// information but it is very efficient here. This space is otherwise unused - /// (and we can bitpack this more in the future). - bool matched; - - uint32_t hash; // Cache of the hash for data_ - Node* next; // Chain to next node for collisions - void* data; // Either the Tuple* or TupleRow* - }; - - struct Bucket { - Node* node; - Bucket() : node(NULL) { } - }; - - /// Use Create() instead. - OldHashTable(RuntimeState* state, const std::vector<ScalarExpr*>& build_exprs, - const std::vector<ScalarExpr*>& probe_exprs, - const std::vector<ScalarExpr*>& filter_exprs, int num_build_tuples, - bool stores_nulls, const std::vector<bool>& finds_nulls, int32_t initial_seed, - MemTracker* mem_tracker, const std::vector<RuntimeFilter*>& filters, - bool stores_tuples, int64_t num_buckets); - - Status Init(ObjectPool* pool, RuntimeState* state); - - /// Simple wrappers to return various fields in this class. These functions are - /// cross-compiled and they exist to avoid the need to make assumption about the - /// order of declaration of these fields when generating the handcrafted IR. - uint8_t* IR_ALWAYS_INLINE expr_values_buffer() const; - uint8_t* IR_ALWAYS_INLINE expr_value_null_bits() const; - ScalarExprEvaluator* const* IR_ALWAYS_INLINE build_expr_evals() const; - ScalarExprEvaluator* const* IR_ALWAYS_INLINE probe_expr_evals() const; - - /// Returns the next non-empty bucket and updates idx to be the index of that bucket. - /// If there are no more buckets, returns NULL and sets idx to -1 - Bucket* NextBucket(int64_t* bucket_idx); - - /// Resize the hash table to 'num_buckets' - void ResizeBuckets(int64_t num_buckets); - - /// Insert row into the hash table - bool IR_ALWAYS_INLINE InsertImpl(void* data); - - /// Chains the node at 'node_idx' to 'bucket'. Nodes in a bucket are chained - /// as a linked list; this places the new node at the beginning of the list. - void AddToBucket(Bucket* bucket, Node* node); - - /// Moves a node from one bucket to another. 'previous_node' refers to the - /// node (if any) that's chained before this node in from_bucket's linked list. - void MoveNode(Bucket* from_bucket, Bucket* to_bucket, Node* node, - Node* previous_node); - - /// Evaluate the exprs over row and cache the results in 'expr_values_buffer_'. - /// Returns whether any expr evaluated to NULL - /// This will be replaced by codegen - bool EvalRow(TupleRow* row, const std::vector<ScalarExprEvaluator*>& evals); - - /// Evaluate 'row' over build exprs caching the results in 'expr_values_buffer_' This - /// will be replaced by codegen. We do not want this function inlined when cross - /// compiled because we need to be able to differentiate between EvalBuildRow and - /// EvalProbeRow by name and the build/probe exprs are baked into the codegen'd function. - bool IR_NO_INLINE EvalBuildRow(TupleRow* row) { - return EvalRow(row, build_expr_evals_); - } - - /// Evaluate 'row' over probe exprs caching the results in 'expr_values_buffer_' - /// This will be replaced by codegen. - bool IR_NO_INLINE EvalProbeRow(TupleRow* row) { - return EvalRow(row, probe_expr_evals_); - } - - /// Compute the hash of the values in expr_values_buffer_. - /// This will be replaced by codegen. We don't want this inlined for replacing - /// with codegen'd functions so the function name does not change. - uint32_t IR_NO_INLINE HashCurrentRow() { - if (var_result_begin_ == -1) { - /// This handles NULLs implicitly since a constant seed value was put - /// into results buffer for nulls. - return HashUtil::Hash(expr_values_buffer_, results_buffer_size_, initial_seed_); - } else { - return HashVariableLenRow(); - } - } - - TupleRow* GetRow(Node* node) const { - if (stores_tuples_) { - return reinterpret_cast<TupleRow*>(&node->data); - } else { - return reinterpret_cast<TupleRow*>(node->data); - } - } - - /// Compute the hash of the values in expr_values_buffer_ for rows with variable length - /// fields (e.g. strings) - uint32_t HashVariableLenRow(); - - /// Returns true if the values of build_exprs evaluated over 'build_row' equal - /// the values cached in expr_values_buffer_ - /// This will be replaced by codegen. - bool Equals(TupleRow* build_row); - - /// Grow the node array. - void GrowNodeArray(); - - /// Sets mem_limit_exceeded_ to true and MEM_LIMIT_EXCEEDED for the query. - /// allocation_size is the attempted size of the allocation that would have - /// brought us over the mem limit. - void MemLimitExceeded(int64_t allocation_size); - - /// Load factor that will trigger growing the hash table on insert. This is - /// defined as the number of non-empty buckets / total_buckets - static const float MAX_BUCKET_OCCUPANCY_FRACTION; - - RuntimeState* state_; - - /// References to the build expressions evaluated on each build-row during insertion - /// and lookup, the probe expressions used during lookup and the filter expression, - /// one per filter in filters_, evaluated on per-build row to produce the value with - /// which to update the corresponding filter. - const std::vector<ScalarExpr*>& build_exprs_; - const std::vector<ScalarExpr*>& probe_exprs_; - const std::vector<ScalarExpr*>& filter_exprs_; - - /// Evaluators for the expressions above. - std::vector<ScalarExprEvaluator*> build_expr_evals_; - std::vector<ScalarExprEvaluator*> probe_expr_evals_; - std::vector<ScalarExprEvaluator*> filter_expr_evals_; - - /// List of filters to build during build phase. - std::vector<RuntimeFilter*> filters_; - - /// Number of Tuple* in the build tuple row - const int num_build_tuples_; - - /// Constants on how the hash table should behave. Joins and aggs have slightly - /// different behavior. - /// TODO: these constants are an ideal candidate to be removed with codegen. - const bool stores_nulls_; - const std::vector<bool> finds_nulls_; - - /// finds_some_nulls_ is just the logical OR of finds_nulls_. - const bool finds_some_nulls_; - - const bool stores_tuples_; - - const int32_t initial_seed_; - - /// Number of non-empty buckets. Used to determine when to grow and rehash - int64_t num_filled_buckets_; - - /// number of nodes stored (i.e. size of hash table) - int64_t num_nodes_; - - /// MemPool used to allocate data pages. - boost::scoped_ptr<MemPool> mem_pool_; - - /// Number of data pages for nodes. - int num_data_pages_; - - /// Next node to insert. - Node* next_node_; - - /// Number of nodes left in the current page. - int node_remaining_current_page_; - - MemTracker* mem_tracker_; - - /// Set to true if the hash table exceeds the memory limit. If this is set, - /// subsequent calls to Insert() will be ignored. - bool mem_limit_exceeded_; - - std::vector<Bucket> buckets_; - - /// equal to buckets_.size() but more efficient than the size function - int64_t num_buckets_; - - /// The number of filled buckets to trigger a resize. This is cached for efficiency - int64_t num_buckets_till_resize_; - - /// Cache of exprs values for the current row being evaluated. This can either - /// be a build row (during Insert()) or probe row (during Find()). - std::vector<int> expr_values_buffer_offsets_; - - /// byte offset into expr_values_buffer_ that begins the variable length results - int var_result_begin_; - - /// byte size of 'expr_values_buffer_' - int results_buffer_size_; - - /// buffer to store evaluated expr results. This address must not change once - /// allocated since the address is baked into the codegen - uint8_t* expr_values_buffer_; - - /// Use bytes instead of bools to be compatible with llvm. This address must - /// not change once allocated. - uint8_t* expr_value_null_bits_; -}; - -} - -#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/old-hash-table.inline.h ---------------------------------------------------------------------- diff --git a/be/src/exec/old-hash-table.inline.h b/be/src/exec/old-hash-table.inline.h deleted file mode 100644 index e2bbe25..0000000 --- a/be/src/exec/old-hash-table.inline.h +++ /dev/null @@ -1,189 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - - -#ifndef IMPALA_EXEC_OLD_HASH_TABLE_INLINE_H -#define IMPALA_EXEC_OLD_HASH_TABLE_INLINE_H - -#include "exec/old-hash-table.h" - -namespace impala { - -inline OldHashTable::Iterator OldHashTable::Find(TupleRow* probe_row) { - uint32_t hash; - if (!EvalAndHashProbe(probe_row, &hash)) return End(); - int64_t bucket_idx = hash & (num_buckets_ - 1); - Bucket* bucket = &buckets_[bucket_idx]; - Node* node = bucket->node; - while (node != NULL) { - if (node->hash == hash && Equals(GetRow(node))) { - return Iterator(this, bucket_idx, node, hash); - } - node = node->next; - } - return End(); -} - -inline OldHashTable::Iterator OldHashTable::Begin() { - int64_t bucket_idx = -1; - Bucket* bucket = NextBucket(&bucket_idx); - if (bucket != NULL) return Iterator(this, bucket_idx, bucket->node, 0); - return End(); -} - -inline OldHashTable::Iterator OldHashTable::FirstUnmatched() { - int64_t bucket_idx = -1; - Bucket* bucket = NextBucket(&bucket_idx); - while (bucket != NULL) { - Node* node = bucket->node; - while (node != NULL && node->matched) { - node = node->next; - } - if (node == NULL) { - bucket = NextBucket(&bucket_idx); - } else { - DCHECK(!node->matched); - return Iterator(this, bucket_idx, node, 0); - } - } - return End(); -} - -inline OldHashTable::Bucket* OldHashTable::NextBucket(int64_t* bucket_idx) { - ++*bucket_idx; - for (; *bucket_idx < num_buckets_; ++*bucket_idx) { - if (buckets_[*bucket_idx].node != NULL) return &buckets_[*bucket_idx]; - } - *bucket_idx = -1; - return NULL; -} - -inline bool OldHashTable::InsertImpl(void* data) { - uint32_t hash = HashCurrentRow(); - int64_t bucket_idx = hash & (num_buckets_ - 1); - if (node_remaining_current_page_ == 0) { - GrowNodeArray(); - if (UNLIKELY(mem_limit_exceeded_)) return false; - } - next_node_->hash = hash; - next_node_->data = data; - next_node_->matched = false; - AddToBucket(&buckets_[bucket_idx], next_node_); - DCHECK_GT(node_remaining_current_page_, 0); - --node_remaining_current_page_; - ++next_node_; - ++num_nodes_; - return true; -} - -inline void OldHashTable::AddToBucket(Bucket* bucket, Node* node) { - num_filled_buckets_ += (bucket->node == NULL); - node->next = bucket->node; - bucket->node = node; -} - -inline bool OldHashTable::EvalAndHashBuild(TupleRow* row, uint32_t* hash) { - bool has_null = EvalBuildRow(row); - if (!stores_nulls_ && has_null) return false; - *hash = HashCurrentRow(); - return true; -} - -inline bool OldHashTable::EvalAndHashProbe(TupleRow* row, uint32_t* hash) { - bool has_null = EvalProbeRow(row); - if (has_null && !(stores_nulls_ && finds_some_nulls_)) return false; - *hash = HashCurrentRow(); - return true; -} - -inline void OldHashTable::MoveNode(Bucket* from_bucket, Bucket* to_bucket, - Node* node, Node* previous_node) { - if (previous_node != NULL) { - previous_node->next = node->next; - } else { - // Update bucket directly - from_bucket->node = node->next; - num_filled_buckets_ -= (node->next == NULL); - } - AddToBucket(to_bucket, node); -} - -template<bool check_match> -inline void OldHashTable::Iterator::Next() { - if (bucket_idx_ == -1) return; - - // TODO: this should prefetch the next tuplerow - // Iterator is not from a full table scan, evaluate equality now. Only the current - // bucket needs to be scanned. 'expr_values_buffer_' contains the results - // for the current probe row. - if (check_match) { - // TODO: this should prefetch the next node - Node* node = node_->next; - while (node != NULL) { - if (node->hash == scan_hash_ && table_->Equals(table_->GetRow(node))) { - node_ = node; - return; - } - node = node->next; - } - *this = table_->End(); - } else { - // Move onto the next chained node - if (node_->next != NULL) { - node_ = node_->next; - return; - } - - // Move onto the next bucket - Bucket* bucket = table_->NextBucket(&bucket_idx_); - if (bucket == NULL) { - bucket_idx_ = -1; - node_ = NULL; - } else { - node_ = bucket->node; - } - } -} - -inline bool OldHashTable::Iterator::NextUnmatched() { - if (bucket_idx_ == -1) return false; - while (true) { - while (node_->next != NULL && node_->next->matched) { - node_ = node_->next; - } - if (node_->next == NULL) { - // Move onto the next bucket. - Bucket* bucket = table_->NextBucket(&bucket_idx_); - if (bucket == NULL) { - bucket_idx_ = -1; - node_ = NULL; - return false; - } else { - node_ = bucket->node; - if (node_ != NULL && !node_->matched) return true; - } - } else { - DCHECK(!node_->next->matched); - node_ = node_->next; - return true; - } - } -} - -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/partitioned-hash-join-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc index 0f731d3..a5c9897 100644 --- a/be/src/exec/partitioned-hash-join-node.cc +++ b/be/src/exec/partitioned-hash-join-node.cc @@ -211,11 +211,6 @@ Status PartitionedHashJoinNode::Reset(RuntimeState* state) { return ExecNode::Reset(state); } -Status PartitionedHashJoinNode::ProcessBuildInput(RuntimeState* state) { - DCHECK(false) << "Should not be called, PHJ uses the BuildSink API"; - return Status::OK(); -} - void PartitionedHashJoinNode::CloseAndDeletePartitions() { // Close all the partitions and clean up all references to them. for (unique_ptr<ProbePartition>& partition : probe_hash_partitions_) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/partitioned-hash-join-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h index 41493d0..73e0dd5 100644 --- a/be/src/exec/partitioned-hash-join-node.h +++ b/be/src/exec/partitioned-hash-join-node.h @@ -120,7 +120,6 @@ class PartitionedHashJoinNode : public BlockingJoinNode { virtual Status QueryMaintenance(RuntimeState* state) override; virtual void AddToDebugString( int indentation_level, std::stringstream* out) const override; - virtual Status ProcessBuildInput(RuntimeState* state) override; // Safe to close the build side early because we rematerialize the build rows always. virtual bool CanCloseBuildEarly() const override { return true; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exprs/agg-fn-evaluator.cc ---------------------------------------------------------------------- diff --git a/be/src/exprs/agg-fn-evaluator.cc b/be/src/exprs/agg-fn-evaluator.cc index 860c78d..3a782be 100644 --- a/be/src/exprs/agg-fn-evaluator.cc +++ b/be/src/exprs/agg-fn-evaluator.cc @@ -21,7 +21,6 @@ #include "codegen/llvm-codegen.h" #include "common/logging.h" -#include "exec/aggregation-node.h" #include "exprs/aggregate-functions.h" #include "exprs/anyval-util.h" #include "exprs/scalar-expr.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/runtime/row-batch.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc index fbe1b94..11cf363 100644 --- a/be/src/runtime/row-batch.cc +++ b/be/src/runtime/row-batch.cc @@ -34,10 +34,6 @@ #include "common/names.h" -// Used to determine memory ownership of a RowBatch's tuple pointers. -DECLARE_bool(enable_partitioned_hash_join); -DECLARE_bool(enable_partitioned_aggregation); - namespace impala { const int RowBatch::AT_CAPACITY_MEM_USAGE; @@ -58,13 +54,9 @@ RowBatch::RowBatch(const RowDescriptor* row_desc, int capacity, MemTracker* mem_ tuple_ptrs_size_ = capacity * num_tuples_per_row_ * sizeof(Tuple*); DCHECK_GT(tuple_ptrs_size_, 0); // TODO: switch to Init() pattern so we can check memory limit and return Status. - if (FLAGS_enable_partitioned_aggregation && FLAGS_enable_partitioned_hash_join) { - mem_tracker_->Consume(tuple_ptrs_size_); - tuple_ptrs_ = reinterpret_cast<Tuple**>(malloc(tuple_ptrs_size_)); - DCHECK(tuple_ptrs_ != NULL); - } else { - tuple_ptrs_ = reinterpret_cast<Tuple**>(tuple_data_pool_.Allocate(tuple_ptrs_size_)); - } + mem_tracker_->Consume(tuple_ptrs_size_); + tuple_ptrs_ = reinterpret_cast<Tuple**>(malloc(tuple_ptrs_size_)); + DCHECK(tuple_ptrs_ != NULL); } // TODO: we want our input_batch's tuple_data to come from our (not yet implemented) @@ -89,13 +81,9 @@ RowBatch::RowBatch( DCHECK_EQ(input_batch.row_tuples.size(), row_desc->tuple_descriptors().size()); DCHECK_GT(tuple_ptrs_size_, 0); // TODO: switch to Init() pattern so we can check memory limit and return Status. - if (FLAGS_enable_partitioned_aggregation && FLAGS_enable_partitioned_hash_join) { - mem_tracker_->Consume(tuple_ptrs_size_); - tuple_ptrs_ = reinterpret_cast<Tuple**>(malloc(tuple_ptrs_size_)); - DCHECK(tuple_ptrs_ != NULL); - } else { - tuple_ptrs_ = reinterpret_cast<Tuple**>(tuple_data_pool_.Allocate(tuple_ptrs_size_)); - } + mem_tracker_->Consume(tuple_ptrs_size_); + tuple_ptrs_ = reinterpret_cast<Tuple**>(malloc(tuple_ptrs_size_)); + DCHECK(tuple_ptrs_ != NULL); uint8_t* tuple_data; if (input_batch.compression_type != THdfsCompression::NONE) { DCHECK_EQ(THdfsCompression::LZ4, input_batch.compression_type) @@ -166,12 +154,10 @@ RowBatch::~RowBatch() { ExecEnv::GetInstance()->buffer_pool()->FreeBuffer( buffer_info.client, &buffer_info.buffer); } - if (FLAGS_enable_partitioned_aggregation && FLAGS_enable_partitioned_hash_join) { - DCHECK(tuple_ptrs_ != NULL); - free(tuple_ptrs_); - mem_tracker_->Release(tuple_ptrs_size_); - tuple_ptrs_ = NULL; - } + DCHECK(tuple_ptrs_ != NULL); + free(tuple_ptrs_); + mem_tracker_->Release(tuple_ptrs_size_); + tuple_ptrs_ = NULL; } Status RowBatch::Serialize(TRowBatch* output_batch) { @@ -346,9 +332,6 @@ void RowBatch::Reset() { } buffers_.clear(); auxiliary_mem_usage_ = 0; - if (!FLAGS_enable_partitioned_aggregation || !FLAGS_enable_partitioned_hash_join) { - tuple_ptrs_ = reinterpret_cast<Tuple**>(tuple_data_pool_.Allocate(tuple_ptrs_size_)); - } flush_ = FlushMode::NO_FLUSH_RESOURCES; needs_deep_copy_ = false; } @@ -373,10 +356,6 @@ void RowBatch::TransferResourceOwnership(RowBatch* dest) { } else if (flush_ == FlushMode::FLUSH_RESOURCES) { dest->MarkFlushResources(); } - if (!FLAGS_enable_partitioned_aggregation || !FLAGS_enable_partitioned_hash_join) { - // Tuple pointers were allocated from tuple_data_pool_ so are transferred. - tuple_ptrs_ = NULL; - } Reset(); } @@ -399,14 +378,8 @@ void RowBatch::AcquireState(RowBatch* src) { num_rows_ = src->num_rows_; capacity_ = src->capacity_; - if (!FLAGS_enable_partitioned_aggregation || !FLAGS_enable_partitioned_hash_join) { - // Tuple pointers are allocated from tuple_data_pool_ so are transferred. - tuple_ptrs_ = src->tuple_ptrs_; - src->tuple_ptrs_ = NULL; - } else { - // tuple_ptrs_ were allocated with malloc so can be swapped between batches. - std::swap(tuple_ptrs_, src->tuple_ptrs_); - } + // tuple_ptrs_ were allocated with malloc so can be swapped between batches. + std::swap(tuple_ptrs_, src->tuple_ptrs_); src->TransferResourceOwnership(this); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/runtime/row-batch.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h index 3cf1093..1b75ebb 100644 --- a/be/src/runtime/row-batch.h +++ b/be/src/runtime/row-batch.h @@ -398,19 +398,10 @@ class RowBatch { /// Array of pointers with InitialCapacity() * num_tuples_per_row_ elements. /// The memory ownership depends on whether legacy joins and aggs are enabled. /// - /// Memory is malloc'd and owned by RowBatch: - /// If enable_partitioned_hash_join=true and enable_partitioned_aggregation=true - /// then the memory is owned by this RowBatch and is freed upon its destruction. - /// This mode is more performant especially with SubplanNodes in the ExecNode tree - /// because the tuple pointers are not transferred and do not have to be re-created - /// in every Reset(). - /// - /// Memory is allocated from MemPool: - /// Otherwise, the memory is allocated from tuple_data_pool_. As a result, the - /// pointer memory is transferred just like tuple data, and must be re-created - /// in Reset(). This mode is required for the legacy join and agg which rely on - /// the tuple pointers being allocated from the tuple_data_pool_, so they can - /// acquire ownership of the tuple pointers. + /// Memory is malloc'd and owned by RowBatch and is freed upon its destruction. This is + /// more performant that allocating the pointers from 'tuple_data_pool_' especially + /// with SubplanNodes in the ExecNode tree because the tuple pointers are not + /// transferred and do not have to be re-created in every Reset(). int tuple_ptrs_size_; Tuple** tuple_ptrs_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/util/backend-gflag-util.cc ---------------------------------------------------------------------- diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc index 5523735..598ba08 100644 --- a/be/src/util/backend-gflag-util.cc +++ b/be/src/util/backend-gflag-util.cc @@ -27,7 +27,6 @@ DECLARE_bool(load_catalog_in_background); DECLARE_bool(load_auth_to_local_rules); DECLARE_bool(enable_stats_extrapolation); -DECLARE_bool(enable_partitioned_hash_join); DECLARE_int32(non_impala_java_vlog); DECLARE_int32(read_size); DECLARE_int32(num_metadata_loading_threads); @@ -73,7 +72,6 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* cfg_bytes) { cfg.__set_lineage_event_log_dir(FLAGS_lineage_event_log_dir); cfg.__set_local_library_path(FLAGS_local_library_dir); cfg.__set_kudu_operation_timeout_ms(FLAGS_kudu_operation_timeout_ms); - cfg.__set_enable_partitioned_hash_join(FLAGS_enable_partitioned_hash_join); cfg.__set_sentry_catalog_polling_frequency_s(FLAGS_sentry_catalog_polling_frequency_s); RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &cfg, cfg_bytes)); return Status::OK(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/common/thrift/BackendGflags.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift index b5f6838..45c9133 100644 --- a/common/thrift/BackendGflags.thrift +++ b/common/thrift/BackendGflags.thrift @@ -55,9 +55,7 @@ struct TBackendGflags { 17: required i32 initial_hms_cnxn_timeout_s - 18: required bool enable_partitioned_hash_join + 18: required bool enable_stats_extrapolation - 19: required bool enable_stats_extrapolation - - 20: required i64 sentry_catalog_polling_frequency_s + 19: required i64 sentry_catalog_polling_frequency_s } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/fe/src/main/java/org/apache/impala/planner/JoinNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/JoinNode.java b/fe/src/main/java/org/apache/impala/planner/JoinNode.java index 04dc40a..030d706 100644 --- a/fe/src/main/java/org/apache/impala/planner/JoinNode.java +++ b/fe/src/main/java/org/apache/impala/planner/JoinNode.java @@ -664,8 +664,7 @@ public abstract class JoinNode extends PlanNode { buildSideProfile.postOpenProfile.sum(nodeResourceProfile_)); ResourceProfile finishedBuildProfile = nodeResourceProfile_; - if (this instanceof NestedLoopJoinNode - || !BackendConfig.INSTANCE.isPartitionedHashJoinEnabled()) { + if (this instanceof NestedLoopJoinNode) { // These exec node implementations may hold references into the build side, which // prevents closing of the build side in a timely manner. This means we have to // count the post-open resource consumption of the build side in the same way as http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/fe/src/main/java/org/apache/impala/service/BackendConfig.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java index 5b641e2..28e5fa3 100644 --- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java +++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java @@ -64,10 +64,6 @@ public class BackendConfig { return backendCfg_.sentry_catalog_polling_frequency_s; } - public boolean isPartitionedHashJoinEnabled() { - return backendCfg_.enable_partitioned_hash_join; - } - // Inits the auth_to_local configuration in the static KerberosName class. private static void initAuthToLocal() { // If auth_to_local is enabled, we read the configuration hadoop.security.auth_to_local http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/testdata/workloads/functional-query/queries/QueryTest/legacy-joins-aggs.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/legacy-joins-aggs.test b/testdata/workloads/functional-query/queries/QueryTest/legacy-joins-aggs.test deleted file mode 100644 index 5658f8b..0000000 --- a/testdata/workloads/functional-query/queries/QueryTest/legacy-joins-aggs.test +++ /dev/null @@ -1,45 +0,0 @@ -==== ----- QUERY -# Query is allowed because it only references scalars. -select * from complextypestbl ----- RESULTS -1 -2 -3 -4 -5 -6 -7 -8 ----- TYPES -bigint -==== ----- QUERY -# Query is allowed because it executes without a subplan. -select * from complextypestbl.int_array ----- RESULTS --1 -1 -1 -2 -2 -3 -3 -NULL -NULL -NULL ----- TYPES -int -==== ----- QUERY -# Query is not supported because it requires a subplan. -select item from complextypestbl t, t.int_array ----- CATCH -Query referencing nested types is not supported because the --enable_partitioned_hash_join and/or --enable_partitioned_aggregation Impala Daemon start-up flags are set to false. -==== ----- QUERY -# Query is not supported because it requires a subplan. -select key, value from complextypestbl t, t.int_map ----- CATCH -Query referencing nested types is not supported because the --enable_partitioned_hash_join and/or --enable_partitioned_aggregation Impala Daemon start-up flags are set to false. -==== http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/tests/common/environ.py ---------------------------------------------------------------------- diff --git a/tests/common/environ.py b/tests/common/environ.py index 891bb02..571dc65 100644 --- a/tests/common/environ.py +++ b/tests/common/environ.py @@ -36,13 +36,7 @@ except ImportError as e: LOG = logging.getLogger('tests.common.environ') -# See if Impala is running with legacy aggregations and/or hash joins. This is kind of a -# hack. It would be better to poll Impala whether it is doing so. test_start_cluster_args = os.environ.get("TEST_START_CLUSTER_ARGS", "") -old_agg_regex = "enable_partitioned_aggregation=false" -old_hash_join_regex = "enable_partitioned_hash_join=false" -USING_OLD_AGGS_JOINS = re.search(old_agg_regex, test_start_cluster_args) is not None or \ - re.search(old_hash_join_regex, test_start_cluster_args) is not None # Find the likely BuildType of the running Impala. Assume it's found through the path # $IMPALA_HOME/be/build/latest as a fallback. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/tests/common/skip.py ---------------------------------------------------------------------- diff --git a/tests/common/skip.py b/tests/common/skip.py index a34d2bc..da3dfe4 100644 --- a/tests/common/skip.py +++ b/tests/common/skip.py @@ -24,7 +24,7 @@ import os import pytest from functools import partial -from tests.common.environ import IMPALAD_BUILD, USING_OLD_AGGS_JOINS +from tests.common.environ import IMPALAD_BUILD from tests.util.filesystem_utils import ( IS_ISILON, IS_LOCAL, @@ -99,16 +99,6 @@ class SkipIfIsilon: reason="This Isilon issue has yet to be triaged.") jira = partial(pytest.mark.skipif, IS_ISILON) -class SkipIfOldAggsJoins: - nested_types = pytest.mark.skipif(USING_OLD_AGGS_JOINS, - reason="Nested types not supported with old aggs and joins") - passthrough_preagg = pytest.mark.skipif(USING_OLD_AGGS_JOINS, - reason="Passthrough optimization not implemented by old agg") - unsupported = pytest.mark.skipif(USING_OLD_AGGS_JOINS, - reason="Query unsupported with old aggs and joins") - requires_spilling = pytest.mark.skipif(USING_OLD_AGGS_JOINS, - reason="Test case requires spilling to pass") - class SkipIfLocal: # These ones are skipped due to product limitations. caching = pytest.mark.skipif(IS_LOCAL, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/tests/custom_cluster/test_legacy_joins_aggs.py ---------------------------------------------------------------------- diff --git a/tests/custom_cluster/test_legacy_joins_aggs.py b/tests/custom_cluster/test_legacy_joins_aggs.py deleted file mode 100644 index 24879d3..0000000 --- a/tests/custom_cluster/test_legacy_joins_aggs.py +++ /dev/null @@ -1,33 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from tests.common.custom_cluster_test_suite import CustomClusterTestSuite - -class TestLegacyJoinsAggs(CustomClusterTestSuite): - """Tests the behavior of the legacy join and agg nodes with nested types.""" - - @classmethod - def get_workload(self): - return 'functional-query' - - @CustomClusterTestSuite.with_args( - impalad_args=('--enable_partitioned_hash_join=false ' - '--enable_partitioned_aggregation=false'), - catalogd_args='--load_catalog_in_background=false') - def test_nested_types(self, vector): - self.run_test_case('QueryTest/legacy-joins-aggs', vector, - use_db='functional_parquet') http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/tests/metadata/test_ddl.py ---------------------------------------------------------------------- diff --git a/tests/metadata/test_ddl.py b/tests/metadata/test_ddl.py index d69f603..feccdc5 100644 --- a/tests/metadata/test_ddl.py +++ b/tests/metadata/test_ddl.py @@ -22,7 +22,7 @@ import time from test_ddl_base import TestDdlBase from tests.common.impala_test_suite import LOG from tests.common.parametrize import UniqueDatabase -from tests.common.skip import SkipIf, SkipIfADLS, SkipIfLocal, SkipIfOldAggsJoins +from tests.common.skip import SkipIf, SkipIfADLS, SkipIfLocal from tests.common.test_dimensions import create_single_exec_option_dimension from tests.util.filesystem_utils import WAREHOUSE, IS_HDFS, IS_LOCAL, IS_S3, IS_ADLS @@ -196,7 +196,6 @@ class TestDdlStatements(TestDdlBase): # supported if old joins and aggs are enabled. Since we do not get any meaningful # additional coverage by running a DDL test under the old aggs and joins, it can be # skipped. - @SkipIfOldAggsJoins.nested_types @UniqueDatabase.parametrize(sync_ddl=True) def test_create_table(self, vector, unique_database): vector.get_value('exec_option')['abort_on_error'] = False http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/tests/query_test/test_aggregation.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_aggregation.py b/tests/query_test/test_aggregation.py index 4999afe..9e0be6d 100644 --- a/tests/query_test/test_aggregation.py +++ b/tests/query_test/test_aggregation.py @@ -20,9 +20,7 @@ import pytest from testdata.common import widetable -from tests.common.environ import USING_OLD_AGGS_JOINS from tests.common.impala_test_suite import ImpalaTestSuite -from tests.common.skip import SkipIfOldAggsJoins from tests.common.test_dimensions import ( create_exec_option_dimension, create_uncompressed_text_dimension) @@ -122,8 +120,6 @@ class TestAggregation(ImpalaTestSuite): def test_aggregation(self, vector): exec_option = vector.get_value('exec_option') disable_codegen = exec_option['disable_codegen'] - # The old aggregation node does not support codegen for all aggregate functions. - check_codegen_enabled = not disable_codegen and not USING_OLD_AGGS_JOINS data_type, agg_func = (vector.get_value('data_type'), vector.get_value('agg_func')) query = 'select %s(%s_col) from alltypesagg where day is not null' % (agg_func, @@ -133,7 +129,7 @@ class TestAggregation(ImpalaTestSuite): assert len(result.data) == 1 self.verify_agg_result(agg_func, data_type, False, result.data[0]); - if check_codegen_enabled: + if not disable_codegen: # Verify codegen was enabled for the preaggregation. # It is deliberately disabled for the merge aggregation. assert_codegen_enabled(result.runtime_profile, [1]) @@ -144,7 +140,7 @@ class TestAggregation(ImpalaTestSuite): assert len(result.data) == 1 self.verify_agg_result(agg_func, data_type, True, result.data[0]); - if check_codegen_enabled: + if not disable_codegen: # Verify codegen was enabled for all stages of the aggregation. assert_codegen_enabled(result.runtime_profile, [1, 2, 4, 6]) @@ -234,8 +230,7 @@ class TestAggregationQueries(ImpalaTestSuite): assert(set(row[i].split(delimiter[i-1])) == set(['1', '2', '3', '4'])) assert(row[4] == '40') assert(row[5] == '4') - check_codegen_enabled = not disable_codegen and not USING_OLD_AGGS_JOINS - if check_codegen_enabled: + if not disable_codegen: # Verify codegen was enabled for all three stages of the aggregation. assert_codegen_enabled(result.runtime_profile, [1, 2, 4]) @@ -267,7 +262,7 @@ class TestAggregationQueries(ImpalaTestSuite): where int_col < 10""" result = self.execute_query(query, exec_option, table_format=table_format) assert(set((result.data)[0].split(" ")) == set(['1','2','3','4','5','6','7','8','9'])) - if check_codegen_enabled: + if not disable_codegen: # Verify codegen was enabled for all four stages of the aggregation. assert_codegen_enabled(result.runtime_profile, [1, 2, 4, 6]) @@ -333,6 +328,5 @@ class TestTPCHAggregationQueries(ImpalaTestSuite): def test_tpch_aggregations(self, vector): self.run_test_case('tpch-aggregations', vector) - @SkipIfOldAggsJoins.passthrough_preagg def test_tpch_passthrough_aggregations(self, vector): self.run_test_case('tpch-passthrough-aggregations', vector) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/tests/query_test/test_join_queries.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_join_queries.py b/tests/query_test/test_join_queries.py index b333a71..a8f2be0 100644 --- a/tests/query_test/test_join_queries.py +++ b/tests/query_test/test_join_queries.py @@ -25,7 +25,6 @@ from tests.common.skip import ( SkipIf, SkipIfIsilon, SkipIfLocal, - SkipIfOldAggsJoins, SkipIfS3, SkipIfADLS) from tests.common.test_vector import ImpalaTestDimension @@ -62,7 +61,6 @@ class TestJoinQueries(ImpalaTestSuite): new_vector.get_value('exec_option')['num_nodes'] = 1 self.run_test_case('QueryTest/single-node-joins-with-limits-exhaustive', new_vector) - @SkipIfOldAggsJoins.unsupported def test_partitioned_joins(self, vector): self.run_test_case('QueryTest/joins-partitioned', vector) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/tests/query_test/test_mt_dop.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_mt_dop.py b/tests/query_test/test_mt_dop.py index 1277002..38a8314 100644 --- a/tests/query_test/test_mt_dop.py +++ b/tests/query_test/test_mt_dop.py @@ -22,7 +22,6 @@ import pytest from copy import deepcopy from tests.common.impala_test_suite import ImpalaTestSuite from tests.common.kudu_test_suite import KuduTestSuite -from tests.common.skip import SkipIfOldAggsJoins from tests.common.test_vector import ImpalaTestDimension # COMPUTE STATS on Parquet tables automatically sets MT_DOP=4, so include @@ -94,7 +93,6 @@ class TestMtDopParquet(ImpalaTestSuite): @pytest.mark.xfail(pytest.config.option.testing_remote_cluster, reason='IMPALA-4641') - @SkipIfOldAggsJoins.nested_types def test_parquet_nested(self, vector): vector.get_value('exec_option')['mt_dop'] = vector.get_value('mt_dop') self.run_test_case('QueryTest/mt-dop-parquet-nested', vector) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/tests/query_test/test_nested_types.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_nested_types.py b/tests/query_test/test_nested_types.py index 5635898..96a170b 100644 --- a/tests/query_test/test_nested_types.py +++ b/tests/query_test/test_nested_types.py @@ -23,7 +23,6 @@ from subprocess import check_call from tests.beeswax.impala_beeswax import ImpalaBeeswaxException from tests.common.impala_test_suite import ImpalaTestSuite from tests.common.skip import ( - SkipIfOldAggsJoins, SkipIfIsilon, SkipIfS3, SkipIfADLS, @@ -31,7 +30,6 @@ from tests.common.skip import ( from tests.util.filesystem_utils import WAREHOUSE, get_fs_path [email protected]_types class TestNestedTypes(ImpalaTestSuite): @classmethod def get_workload(self): @@ -88,7 +86,6 @@ class TestNestedTypes(ImpalaTestSuite): vector.get_value('exec_option')['num_nodes'] = 1 self.run_test_case('QueryTest/nested-types-parquet-stats', vector) [email protected]_types class TestParquetArrayEncodings(ImpalaTestSuite): TESTFILE_DIR = os.path.join(os.environ['IMPALA_HOME'], "testdata/parquet_nested_types_encodings") @@ -534,7 +531,6 @@ class TestParquetArrayEncodings(ImpalaTestSuite): local_path = self.TESTFILE_DIR + "/" + filename check_call(["hadoop", "fs", "-put", local_path, location], shell=False) [email protected]_types class TestMaxNestingDepth(ImpalaTestSuite): # Should be kept in sync with the FE's Type.MAX_NESTING_DEPTH MAX_NESTING_DEPTH = 100 http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/tests/query_test/test_runtime_filters.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_runtime_filters.py b/tests/query_test/test_runtime_filters.py index 6edfd82..7710f43 100644 --- a/tests/query_test/test_runtime_filters.py +++ b/tests/query_test/test_runtime_filters.py @@ -20,7 +20,7 @@ import pytest import time from tests.common.impala_test_suite import ImpalaTestSuite -from tests.common.skip import SkipIfLocal, SkipIfOldAggsJoins +from tests.common.skip import SkipIfLocal @SkipIfLocal.multiple_impalad class TestRuntimeFilters(ImpalaTestSuite): @@ -63,7 +63,5 @@ class TestRuntimeRowFilters(ImpalaTestSuite): def test_row_filters(self, vector): self.run_test_case('QueryTest/runtime_row_filters', vector) - @SkipIfOldAggsJoins.requires_spilling - @SkipIfOldAggsJoins.nested_types def test_row_filters_phj_only(self, vector): self.run_test_case('QueryTest/runtime_row_filters_phj', vector) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/tests/query_test/test_scanners.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py index 74a69ac..e9fd457 100644 --- a/tests/query_test/test_scanners.py +++ b/tests/query_test/test_scanners.py @@ -36,7 +36,6 @@ from tests.common.skip import ( SkipIfS3, SkipIfADLS, SkipIfIsilon, - SkipIfOldAggsJoins, SkipIfLocal) from tests.common.test_dimensions import create_single_exec_option_dimension from tests.common.test_result_verifier import ( @@ -251,7 +250,6 @@ class TestParquet(ImpalaTestSuite): def test_parquet(self, vector): self.run_test_case('QueryTest/parquet', vector) - @SkipIfOldAggsJoins.nested_types def test_corrupt_files(self, vector): vector.get_value('exec_option')['abort_on_error'] = 0 self.run_test_case('QueryTest/parquet-continue-on-error', vector) @@ -536,7 +534,6 @@ class TestParquet(ImpalaTestSuite): assert c_schema_elt.converted_type == ConvertedType.UTF8 assert d_schema_elt.converted_type == None - @SkipIfOldAggsJoins.nested_types def test_resolution_by_name(self, vector, unique_database): self.run_test_case('QueryTest/parquet-resolution-by-name', vector, use_db=unique_database) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/tests/query_test/test_tpch_nested_queries.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_tpch_nested_queries.py b/tests/query_test/test_tpch_nested_queries.py index 7a78cbe..28e6ac3 100644 --- a/tests/query_test/test_tpch_nested_queries.py +++ b/tests/query_test/test_tpch_nested_queries.py @@ -18,10 +18,8 @@ # Functional tests running the TPCH workload. from tests.common.impala_test_suite import ImpalaTestSuite -from tests.common.skip import SkipIfOldAggsJoins from tests.common.test_dimensions import create_single_exec_option_dimension [email protected]_types class TestTpchNestedQuery(ImpalaTestSuite): @classmethod def get_workload(self):
