http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/sorter.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/sorter.h b/be/src/runtime/sorter.h index 80c5558..cafab72 100644 --- a/be/src/runtime/sorter.h +++ b/be/src/runtime/sorter.h @@ -20,7 +20,7 @@ #include <deque> -#include "runtime/buffered-block-mgr.h" +#include "runtime/bufferpool/buffer-pool.h" #include "util/tuple-row-compare.h" namespace impala { @@ -31,8 +31,7 @@ class RowBatch; /// Sorter contains the external sort implementation. Its purpose is to sort arbitrarily /// large input data sets with a fixed memory budget by spilling data to disk if -/// necessary. BufferedBlockMgr is used to allocate and manage blocks of data to be -/// sorted. +/// necessary. // /// The client API for Sorter is as follows: /// AddBatch() is used to add input rows to be sorted. Multiple tuples in an input row are @@ -52,20 +51,20 @@ class RowBatch; /// GetNext() is used to retrieve sorted rows. It can be called multiple times. /// AddBatch()/AddBatchNoSpill(), InputDone() and GetNext() must be called in that order. // -/// Batches of input rows are collected into a sequence of pinned BufferedBlockMgr blocks -/// called a run. The maximum size of a run is determined by the number of blocks that +/// Batches of input rows are collected into a sequence of pinned BufferPool pages +/// called a run. The maximum size of a run is determined by the number of pages that /// can be pinned by the Sorter. After the run is full, it is sorted in memory, unpinned /// and the next run is constructed. The variable-length column data (e.g. string slots) -/// in the materialized sort tuples are stored in a separate sequence of blocks from the -/// tuples themselves. When the blocks containing tuples in a run are unpinned, the +/// in the materialized sort tuples are stored in a separate sequence of pages from the +/// tuples themselves. When the pages containing tuples in a run are unpinned, the /// var-len slot pointers are converted to offsets from the start of the first var-len -/// data block. When a block is read back, these offsets are converted back to pointers. +/// data page. When a page is read back, these offsets are converted back to pointers. /// The in-memory sorter sorts the fixed-length tuples in-place. The output rows have the /// same schema as the materialized sort tuples. // /// After the input is consumed, the sorter is left with one or more sorted runs. If /// there are multiple runs, the runs are merged using SortedRunMerger. At least one -/// block per run (two if there are var-length slots) must be pinned in memory during +/// page per run (two if there are var-length slots) must be pinned in memory during /// a merge, so multiple merges may be necessary if the number of runs is too large. /// First a series of intermediate merges are performed, until the number of runs is /// small enough to do a single final merge that returns batches of sorted rows to the @@ -73,7 +72,7 @@ class RowBatch; /// /// If there is a single sorted run (i.e. no merge required), only tuple rows are /// copied into the output batch supplied by GetNext(), and the data itself is left in -/// pinned blocks held by the sorter. +/// pinned pages held by the sorter. /// /// When merges are performed, one input batch is created to hold tuple rows for each /// input run, and one batch is created to hold deep copied rows (i.e. ptrs + data) from @@ -84,7 +83,7 @@ class RowBatch; /// During a merge, one row batch is created for each input run, and one batch is created /// for the output of the merge (if is not the final merge). It is assumed that the memory /// for these batches have already been accounted for in the memory budget for the sort. -/// That is, the memory for these batches does not come out of the block buffer manager. +/// That is, the memory for these batches does not come out of the buffer pool. // /// TODO: Not necessary to actually copy var-len data - instead take ownership of the /// var-length data in the input batch. Copying can be deferred until a run is unpinned. @@ -96,17 +95,23 @@ class Sorter { /// 'sort_tuple_exprs' are the slot exprs used to materialize the tuples to be /// sorted. 'compare_less_than' is a comparator for the sort tuples (returns true if /// lhs < rhs). 'merge_batch_size_' is the size of the batches created to provide rows - /// to the merger and retrieve rows from an intermediate merger. 'enable_spilling' - /// should be set to false to reduce the number of requested buffers if the caller will - /// use AddBatchNoSpill(). + /// to the merger and retrieve rows from an intermediate merger. 'node_id' is the ID of + /// the exec node using the sorter for error reporting. 'enable_spilling' should be set + /// to false to reduce the number of requested buffers if the caller will use + /// AddBatchNoSpill(). + /// + /// The Sorter assumes that it has exclusive use of the client's + /// reservations for sorting, and may increase the size of the client's reservation. + /// The caller is responsible for ensuring that the minimum reservation (returned from + /// ComputeMinReservation()) is available. Sorter(const TupleRowComparator& compare_less_than, const std::vector<ScalarExpr*>& sort_tuple_exprs, RowDescriptor* output_row_desc, - MemTracker* mem_tracker, RuntimeProfile* profile, RuntimeState* state, - bool enable_spilling = true); - + MemTracker* mem_tracker, BufferPool::ClientHandle* client, int64_t page_len, + RuntimeProfile* profile, RuntimeState* state, int node_id, + bool enable_spilling); ~Sorter(); - /// Initial set-up of the sorter for execution. Registers with the block mgr. + /// Initial set-up of the sorter for execution. /// The evaluators for 'sort_tuple_exprs_' will be created and stored in 'obj_pool'. /// All allocation from the evaluators will be from 'expr_mem_pool'. Status Prepare(ObjectPool* obj_pool, MemPool* expr_mem_pool) WARN_UNUSED_RESULT; @@ -143,24 +148,29 @@ class Sorter { /// Close the Sorter and free resources. void Close(RuntimeState* state); + /// Compute the minimum amount of buffer memory in bytes required to execute a + /// sort with the current sorter. + int64_t ComputeMinReservation(); + private: + class Page; class Run; class TupleIterator; class TupleSorter; /// Create a SortedRunMerger from sorted runs in 'sorted_runs_' and assign it to /// 'merger_'. Attempts to set up merger with 'max_num_runs' runs but may set it - /// up with fewer if it cannot pin the initial blocks of all of the runs. Fails + /// up with fewer if it cannot pin the initial pages of all of the runs. Fails /// if it cannot merge at least two runs. The runs to be merged are removed from /// 'sorted_runs_'. The Sorter sets the 'deep_copy_input' flag to true for the - /// merger, since the blocks containing input run data will be deleted as input + /// merger, since the pages containing input run data will be deleted as input /// runs are read. Status CreateMerger(int max_num_runs) WARN_UNUSED_RESULT; /// Repeatedly replaces multiple smaller runs in sorted_runs_ with a single larger /// merged run until there are few enough runs to be merged with a single merger. /// Returns when 'merger_' is set up to merge the final runs. - /// At least 1 (2 if var-len slots) block from each sorted run must be pinned for + /// At least 1 (2 if var-len slots) page from each sorted run must be pinned for /// a merge. If the number of sorted runs is too large, merge sets of smaller runs /// into large runs until a final merge can be performed. An intermediate row batch /// containing deep copied rows is used for the output of each intermediate merge. @@ -177,6 +187,9 @@ class Sorter { /// Helper that cleans up all runs in the sorter. void CleanupAllRuns(); + /// ID of the ExecNode that owns the sorter, used for error reporting. + const int node_id_; + /// Runtime state instance used to check for cancellation. Not owned. RuntimeState* const state_; @@ -184,11 +197,11 @@ class Sorter { const TupleRowComparator& compare_less_than_; boost::scoped_ptr<TupleSorter> in_mem_tuple_sorter_; - /// Block manager object used to allocate, pin and release runs. Not owned by Sorter. - BufferedBlockMgr* block_mgr_; + /// Client used to allocate pages from the buffer pool. Not owned. + BufferPool::ClientHandle* const buffer_pool_client_; - /// Handle to block mgr to make allocations from. - BufferedBlockMgr::Client* block_mgr_client_; + /// The length of page to use. + const int64_t page_len_; /// True if the tuples to be sorted have var-length slots. bool has_var_len_slots_; @@ -211,7 +224,7 @@ class Sorter { /// BEGIN: Members that must be Reset() /// The current unsorted run that is being collected. Is sorted and added to - /// sorted_runs_ after it is full (i.e. number of blocks allocated == max available + /// sorted_runs_ after it is full (i.e. number of pages allocated == max available /// buffers) or after the input is complete. Owned and placed in obj_pool_. /// When it is added to sorted_runs_, it is set to NULL. Run* unsorted_run_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/test-env.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc index 37b4363..23dfa4c 100644 --- a/be/src/runtime/test-env.cc +++ b/be/src/runtime/test-env.cc @@ -20,7 +20,6 @@ #include <limits> #include <memory> -#include "runtime/buffered-block-mgr.h" #include "runtime/query-exec-mgr.h" #include "runtime/tmp-file-mgr.h" #include "runtime/query-state.h" @@ -38,8 +37,8 @@ scoped_ptr<MetricGroup> TestEnv::static_metrics_; TestEnv::TestEnv() : have_tmp_file_mgr_args_(false), - buffer_pool_min_buffer_len_(-1), - buffer_pool_capacity_(-1) {} + buffer_pool_min_buffer_len_(64 * 1024), + buffer_pool_capacity_(0) {} Status TestEnv::Init() { if (static_metrics_ == NULL) { @@ -59,9 +58,7 @@ Status TestEnv::Init() { } else { RETURN_IF_ERROR(tmp_file_mgr()->Init(metrics())); } - if (buffer_pool_min_buffer_len_ != -1 && buffer_pool_capacity_ != -1) { - exec_env_->InitBufferPool(buffer_pool_min_buffer_len_, buffer_pool_capacity_); - } + exec_env_->InitBufferPool(buffer_pool_min_buffer_len_, buffer_pool_capacity_); return Status::OK(); } @@ -88,6 +85,7 @@ void TestEnv::TearDownQueries() { for (RuntimeState* runtime_state : runtime_states_) runtime_state->ReleaseResources(); runtime_states_.clear(); for (QueryState* query_state : query_states_) { + query_state->ReleaseInitialReservationRefcount(); exec_env_->query_exec_mgr()->ReleaseQueryState(query_state); } query_states_.clear(); @@ -137,17 +135,4 @@ Status TestEnv::CreateQueryState( *runtime_state = rs; return Status::OK(); } - -Status TestEnv::CreateQueryStateWithBlockMgr(int64_t query_id, int max_buffers, - int block_size, const TQueryOptions* query_options, RuntimeState** runtime_state) { - RETURN_IF_ERROR(CreateQueryState(query_id, query_options, runtime_state)); - - shared_ptr<BufferedBlockMgr> mgr; - RETURN_IF_ERROR(BufferedBlockMgr::Create(*runtime_state, - (*runtime_state)->query_state()->query_mem_tracker(), - (*runtime_state)->runtime_profile(), tmp_file_mgr(), - CalculateMemLimit(max_buffers, block_size), block_size, &mgr)); - (*runtime_state)->set_block_mgr(mgr); - return Status::OK(); -} } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/test-env.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/test-env.h b/be/src/runtime/test-env.h index f314452..e721510 100644 --- a/be/src/runtime/test-env.h +++ b/be/src/runtime/test-env.h @@ -55,13 +55,8 @@ class TestEnv { Status CreateQueryState( int64_t query_id, const TQueryOptions* query_options, RuntimeState** runtime_state); - /// Same as CreateQueryState() but also creates a BufferedBlockMgr with the provided - /// parameters. If 'max_buffers' is -1, there is no limit, otherwise the limit is - /// max_buffers * block_size. - Status CreateQueryStateWithBlockMgr(int64_t query_id, int max_buffers, int block_size, - const TQueryOptions* query_options, RuntimeState** runtime_state); - /// Destroy all query states and associated RuntimeStates, BufferedBlockMgrs, - /// etc, that were created since the last TearDownQueries() call. + /// Destroy all query states and associated RuntimeStates, etc, that were created since + /// the last TearDownQueries() call. void TearDownQueries(); /// Calculate memory limit accounting for overflow and negative values. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/tmp-file-mgr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc index c94ba1d..343ec93 100644 --- a/be/src/runtime/tmp-file-mgr-test.cc +++ b/be/src/runtime/tmp-file-mgr-test.cc @@ -145,6 +145,12 @@ class TmpFileMgrTest : public ::testing::Test { return bytes_allocated; } + /// Helpers to call WriteHandle methods. + void Cancel(TmpFileMgr::WriteHandle* handle) { handle->Cancel(); } + void WaitForWrite(TmpFileMgr::WriteHandle* handle) { + handle->WaitForWrite(); + } + // Write callback, which signals 'cb_cv_' and increments 'cb_counter_'. void SignalCallback(Status write_status) { { @@ -481,8 +487,8 @@ TEST_F(TmpFileMgrTest, TestEncryptionDuringCancellation) { string file_path = handle->TmpFilePath(); // Cancel the write - prior to the IMPALA-4820 fix decryption could race with the write. - handle->Cancel(); - handle->WaitForWrite(); + Cancel(handle.get()); + WaitForWrite(handle.get()); ASSERT_OK(file_group.RestoreData(move(handle), data_mem_range)); WaitForCallbacks(1); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/tmp-file-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h index c71c370..ba7210d 100644 --- a/be/src/runtime/tmp-file-mgr.h +++ b/be/src/runtime/tmp-file-mgr.h @@ -281,21 +281,9 @@ class TmpFileMgr { DCHECK(read_range_ == nullptr); } - /// Cancels any in-flight writes or reads. Reads are cancelled synchronously and - /// writes are cancelled asynchronously. After Cancel() is called, writes are not - /// retried. The write callback may be called with a CANCELLED status (unless - /// it succeeded or encountered a different error first). - /// TODO: IMPALA-3200: make this private once BufferedBlockMgr doesn't need it. - void Cancel(); - /// Cancel any in-flight read synchronously. void CancelRead(); - /// Blocks until the write completes either successfully or unsuccessfully. - /// May return before the write callback has been called. - /// TODO: IMPALA-3200: make this private once BufferedBlockMgr doesn't need it. - void WaitForWrite(); - /// Path of temporary file backing the block. Intended for use in testing. /// Returns empty string if no backing file allocated. std::string TmpFilePath() const; @@ -307,6 +295,7 @@ class TmpFileMgr { private: friend class FileGroup; + friend class TmpFileMgrTest; WriteHandle(RuntimeProfile::Counter* encryption_timer, WriteDoneCallback cb); @@ -327,6 +316,16 @@ class TmpFileMgr { /// then calls 'cb_'. void WriteComplete(const Status& write_status); + /// Cancels any in-flight writes or reads. Reads are cancelled synchronously and + /// writes are cancelled asynchronously. After Cancel() is called, writes are not + /// retried. The write callback may be called with a CANCELLED status (unless + /// it succeeded or encountered a different error first). + void Cancel(); + + /// Blocks until the write completes either successfully or unsuccessfully. + /// May return before the write callback has been called. + void WaitForWrite(); + /// Encrypts the data in 'buffer' in-place and computes 'hash_'. Status EncryptAndHash(MemRange buffer) WARN_UNUSED_RESULT; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/service/client-request-state.cc ---------------------------------------------------------------------- diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc index 6be04f6..bf0f9b4 100644 --- a/be/src/service/client-request-state.cc +++ b/be/src/service/client-request-state.cc @@ -399,9 +399,9 @@ Status ClientRequestState::ExecQueryOrDmlRequest( ss << query_exec_request.per_host_mem_estimate; summary_profile_.AddInfoString(PER_HOST_MEM_KEY, ss.str()); } - if (query_exec_request.__isset.per_host_min_reservation) { + if (query_exec_request.query_ctx.__isset.per_host_min_reservation) { stringstream ss; - ss << query_exec_request.per_host_min_reservation; + ss << query_exec_request.query_ctx.per_host_min_reservation; summary_profile_.AddInfoString(PER_HOST_MEMORY_RESERVATION_KEY, ss.str()); } if (!query_exec_request.query_ctx.__isset.parent_query_id && http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/service/query-options.cc ---------------------------------------------------------------------- diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc index 8dcd7af..c123902 100644 --- a/be/src/service/query-options.cc +++ b/be/src/service/query-options.cc @@ -261,10 +261,10 @@ Status impala::SetQueryOption(const string& key, const string& value, case TImpalaQueryOptions::QUERY_TIMEOUT_S: query_options->__set_query_timeout_s(atoi(value.c_str())); break; - case TImpalaQueryOptions::MAX_BLOCK_MGR_MEMORY: { + case TImpalaQueryOptions::BUFFER_POOL_LIMIT: { int64_t mem; - RETURN_IF_ERROR(ParseMemValue(value, "block mgr memory limit", &mem)); - query_options->__set_max_block_mgr_memory(mem); + RETURN_IF_ERROR(ParseMemValue(value, "buffer pool limit", &mem)); + query_options->__set_buffer_pool_limit(mem); break; } case TImpalaQueryOptions::APPX_COUNT_DISTINCT: { @@ -505,6 +505,28 @@ Status impala::SetQueryOption(const string& key, const string& value, query_options->__set_disable_codegen_rows_threshold(val); break; } + case TImpalaQueryOptions::DEFAULT_SPILLABLE_BUFFER_SIZE: { + int64_t buffer_size_bytes; + RETURN_IF_ERROR( + ParseMemValue(value, "Spillable buffer size", &buffer_size_bytes)); + if (!BitUtil::IsPowerOf2(buffer_size_bytes)) { + return Status( + Substitute("Buffer size must be a power of two: $0", buffer_size_bytes)); + } + query_options->__set_default_spillable_buffer_size(buffer_size_bytes); + break; + } + case TImpalaQueryOptions::MIN_SPILLABLE_BUFFER_SIZE: { + int64_t buffer_size_bytes; + RETURN_IF_ERROR( + ParseMemValue(value, "Spillable buffer size", &buffer_size_bytes)); + if (!BitUtil::IsPowerOf2(buffer_size_bytes)) { + return Status( + Substitute("Buffer size must be a power of two: $0", buffer_size_bytes)); + } + query_options->__set_min_spillable_buffer_size(buffer_size_bytes); + break; + } default: // We hit this DCHECK(false) if we forgot to add the corresponding entry here // when we add a new query option. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/service/query-options.h ---------------------------------------------------------------------- diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h index 603c783..8d6af02 100644 --- a/be/src/service/query-options.h +++ b/be/src/service/query-options.h @@ -35,7 +35,7 @@ class TQueryOptions; // the DCHECK. #define QUERY_OPTS_TABLE\ DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\ - TImpalaQueryOptions::DISABLE_CODEGEN_ROWS_THRESHOLD + 1);\ + TImpalaQueryOptions::MIN_SPILLABLE_BUFFER_SIZE + 1);\ QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\ QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR)\ QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\ @@ -62,7 +62,7 @@ class TQueryOptions; QUERY_OPT_FN(v_cpu_cores, V_CPU_CORES)\ QUERY_OPT_FN(rm_initial_mem, RM_INITIAL_MEM)\ QUERY_OPT_FN(query_timeout_s, QUERY_TIMEOUT_S)\ - QUERY_OPT_FN(max_block_mgr_memory, MAX_BLOCK_MGR_MEMORY)\ + QUERY_OPT_FN(buffer_pool_limit, BUFFER_POOL_LIMIT)\ QUERY_OPT_FN(appx_count_distinct, APPX_COUNT_DISTINCT)\ QUERY_OPT_FN(disable_unsafe_spills, DISABLE_UNSAFE_SPILLS)\ QUERY_OPT_FN(seq_compression_mode, SEQ_COMPRESSION_MODE)\ @@ -93,6 +93,8 @@ class TQueryOptions; QUERY_OPT_FN(parquet_read_statistics, PARQUET_READ_STATISTICS)\ QUERY_OPT_FN(default_join_distribution_mode, DEFAULT_JOIN_DISTRIBUTION_MODE)\ QUERY_OPT_FN(disable_codegen_rows_threshold, DISABLE_CODEGEN_ROWS_THRESHOLD)\ + QUERY_OPT_FN(default_spillable_buffer_size, DEFAULT_SPILLABLE_BUFFER_SIZE)\ + QUERY_OPT_FN(min_spillable_buffer_size, MIN_SPILLABLE_BUFFER_SIZE)\ ; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/util/bloom-filter.h ---------------------------------------------------------------------- diff --git a/be/src/util/bloom-filter.h b/be/src/util/bloom-filter.h index 5ebd9b5..913b331 100644 --- a/be/src/util/bloom-filter.h +++ b/be/src/util/bloom-filter.h @@ -28,7 +28,7 @@ #include "common/compiler-util.h" #include "gen-cpp/ImpalaInternalService_types.h" #include "gutil/macros.h" -#include "runtime/buffered-block-mgr.h" +#include "util/cpu-info.h" #include "util/hash-util.h" namespace impala { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/util/static-asserts.cc ---------------------------------------------------------------------- diff --git a/be/src/util/static-asserts.cc b/be/src/util/static-asserts.cc index cf12e36..7662906 100644 --- a/be/src/util/static-asserts.cc +++ b/be/src/util/static-asserts.cc @@ -18,7 +18,6 @@ #include <boost/static_assert.hpp> #include "common/hdfs.h" -#include "runtime/buffered-tuple-stream.h" #include "runtime/string-value.h" #include "runtime/timestamp-value.h" #include "udf/udf.h" @@ -37,7 +36,6 @@ class UnusedClass { BOOST_STATIC_ASSERT(sizeof(boost::gregorian::date) == 4); BOOST_STATIC_ASSERT(sizeof(hdfsFS) == sizeof(void*)); BOOST_STATIC_ASSERT(sizeof(hdfsFile) == sizeof(void*)); - BOOST_STATIC_ASSERT(sizeof(BufferedTupleStream::RowIdx) == sizeof(void*)); // If the memory layout of any of these types changes, it will be necessary to change // LlvmCodeGen::GetUdfValType(), and we may also run into calling convention problems http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/common/thrift/Frontend.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift index 79da0d6..3a88915 100644 --- a/common/thrift/Frontend.thrift +++ b/common/thrift/Frontend.thrift @@ -389,23 +389,11 @@ struct TQueryExecRequest { // Estimated per-host peak memory consumption in bytes. Used for resource management. 8: optional i64 per_host_mem_estimate - // Minimum query-wide buffer reservation required per host in bytes. This is the peak - // minimum reservation that may be required by the concurrently-executing operators at - // any point in query execution. It may be less than the initial reservation total - // claims (below) if execution of some operators never overlaps, which allows reuse of - // reservations. - 9: optional i64 per_host_min_reservation; - - // Total of the initial buffer reservations that we expect to be claimed per host. - // I.e. the sum over all operators in all fragment instances that execute on that host. - // Measured in bytes. - 10: optional i64 per_host_initial_reservation_total_claims; - // List of replica hosts. Used by the host_idx field of TScanRangeLocation. - 11: required list<Types.TNetworkAddress> host_list + 9: required list<Types.TNetworkAddress> host_list // Column lineage graph - 12: optional LineageGraph.TLineageGraph lineage_graph + 10: optional LineageGraph.TLineageGraph lineage_graph } enum TCatalogOpType { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/common/thrift/ImpalaInternalService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift index 4aefe55..b477299 100644 --- a/common/thrift/ImpalaInternalService.thrift +++ b/common/thrift/ImpalaInternalService.thrift @@ -130,7 +130,7 @@ struct TQueryOptions { 26: optional i32 query_timeout_s = 0 // test hook to cap max memory for spilling operators (to force them to spill). - 27: optional i64 max_block_mgr_memory + 27: optional i64 buffer_pool_limit // If true, transforms all count(distinct) aggregations into NDV() 28: optional bool appx_count_distinct = 0 @@ -255,6 +255,14 @@ struct TQueryOptions { // If the number of rows processed per node is below the threshold codegen will be // automatically disabled by the planner. 57: optional i32 disable_codegen_rows_threshold = 50000 + + // The default spillable buffer size in bytes, which may be overridden by the planner. + // Defaults to 2MB. + 58: optional i64 default_spillable_buffer_size = 2097152; + + // The minimum spillable buffer to use. The planner will not choose a size smaller than + // this. Defaults to 64KB. + 59: optional i64 min_spillable_buffer_size = 65536; } // Impala currently has two types of sessions: Beeswax and HiveServer2 @@ -375,6 +383,18 @@ struct TQueryCtx { // String containing a timestamp (in UTC) set as the query submission time. It // represents the same point in time as now_string 17: required string utc_timestamp_string + + // Minimum query-wide buffer reservation required per host in bytes. This is the peak + // minimum reservation that may be required by the concurrently-executing operators at + // any point in query execution. It may be less than the initial reservation total + // claims (below) if execution of some operators never overlaps, which allows reuse of + // reservations. + 18: optional i64 per_host_min_reservation; + + // Total of the initial buffer reservations that we expect to be claimed per host. + // I.e. the sum over all operators in all fragment instances that execute on that host. + // Measured in bytes. + 19: optional i64 per_host_initial_reservation_total_claims; } // Specification of one output destination of a plan fragment http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/common/thrift/ImpalaService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift index ec82bf1..ced884b 100644 --- a/common/thrift/ImpalaService.thrift +++ b/common/thrift/ImpalaService.thrift @@ -159,7 +159,7 @@ enum TImpalaQueryOptions { QUERY_TIMEOUT_S, // Test hook for spill to disk operators - MAX_BLOCK_MGR_MEMORY, + BUFFER_POOL_LIMIT, // Transforms all count(distinct) aggregations into NDV() APPX_COUNT_DISTINCT, @@ -279,6 +279,12 @@ enum TImpalaQueryOptions { // If the number of rows processed per node is below the threshold and disable_codegen // is unset, codegen will be automatically be disabled by the planner. DISABLE_CODEGEN_ROWS_THRESHOLD, + + // The default spillable buffer size, in bytes. + DEFAULT_SPILLABLE_BUFFER_SIZE, + + // The minimum spillable buffer size, in bytes. + MIN_SPILLABLE_BUFFER_SIZE, } // The summary of a DML statement. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/common/thrift/PlanNodes.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift index c1ff302..468ca44 100644 --- a/common/thrift/PlanNodes.thrift +++ b/common/thrift/PlanNodes.thrift @@ -481,6 +481,21 @@ struct TUnnestNode { 1: required Exprs.TExpr collection_expr } +// This contains all of the information computed by the plan as part of the resource +// profile that is needed by the backend to execute. +struct TBackendResourceProfile { + // The minimum reservation for this plan node in bytes. + 1: required i64 min_reservation + + // The maximum reservation for this plan node in bytes. MAX_INT64 means effectively + // unlimited. + 2: required i64 max_reservation + + // The spillable buffer size in bytes to use for this node, chosen by the planner. + // Set iff the node uses spillable buffers. + 3: optional i64 spillable_buffer_size +} + // This is essentially a union of all messages corresponding to subclasses // of PlanNode. struct TPlanNode { @@ -526,6 +541,9 @@ struct TPlanNode { // Runtime filters assigned to this plan node 24: optional list<TRuntimeFilterDesc> runtime_filters + + // Resource profile for this plan node. + 25: required TBackendResourceProfile resource_profile } // A flattened representation of a tree of PlanNodes, obtained by depth-first http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/common/thrift/generate_error_codes.py ---------------------------------------------------------------------- diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py index 80e054e..ccd713c 100755 --- a/common/thrift/generate_error_codes.py +++ b/common/thrift/generate_error_codes.py @@ -240,11 +240,11 @@ error_codes = ( ("PARTITIONED_HASH_JOIN_REPARTITION_FAILS", 76, "Cannot perform hash join at node with " "id $0. Repartitioning did not reduce the size of a spilled partition. Repartitioning " - "level $1. Number of rows $2."), + "level $1. Number of rows $2:\\n$3\\n$4"), ("PARTITIONED_AGG_REPARTITION_FAILS", 77, "Cannot perform aggregation at node with " "id $0. Repartitioning did not reduce the size of a spilled partition. Repartitioning " - "level $1. Number of rows $2."), + "level $1. Number of rows $2:\\n$3\\n$4"), ("AVRO_TRUNCATED_BLOCK", 78, "File '$0' is corrupt: truncated data block at offset $1"), @@ -322,10 +322,14 @@ error_codes = ( # TODO: IMPALA-3200: make sure that this references the correct query option. ("MAX_ROW_SIZE", 104, "Row of size $0 could not be materialized in plan node with " - "id $1. Limit is $2, which can be increased with query option max_row_size"), + "id $1. Increase the <TBD> query option (currently $2) to process larger rows."), ("IR_VERIFY_FAILED", 105, "Failed to verify generated IR function $0, see log for more details."), + + ("MINIMUM_RESERVATION_UNAVAILABLE", 106, "Failed to get minimum memory reservation of " + "$0 on daemon $1:$2 for query $3 because it would exceed an applicable query, " + "request pool or process memory limit. Memory usage:\\n$4"), ) import sys http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/common/RuntimeEnv.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/common/RuntimeEnv.java b/fe/src/main/java/org/apache/impala/common/RuntimeEnv.java index 2041090..3c33bf1 100644 --- a/fe/src/main/java/org/apache/impala/common/RuntimeEnv.java +++ b/fe/src/main/java/org/apache/impala/common/RuntimeEnv.java @@ -17,8 +17,6 @@ package org.apache.impala.common; -import org.apache.impala.service.BackendConfig; - /** * Contains runtime-specific parameters such as the number of CPU cores. Currently only * used in Plan cost estimation. The static RuntimeEnv members can be set so that tests @@ -33,9 +31,6 @@ public class RuntimeEnv { // PlanNode.computeResourceProfile(). Currently the backend only support a single // spillable buffer size, so this is equal to PlanNode.DEFAULT_SPILLABLE_BUFFER_BYTES, // except in planner tests. - // TODO: IMPALA-3200: this get from query option - private long minSpillableBufferBytes_; - // Indicates whether this is an environment for testing. private boolean isTestEnv_; @@ -48,15 +43,10 @@ public class RuntimeEnv { */ public void reset() { numCores_ = Runtime.getRuntime().availableProcessors(); - minSpillableBufferBytes_ = BackendConfig.INSTANCE.getReadSize(); } public int getNumCores() { return numCores_; } public void setNumCores(int numCores) { this.numCores_ = numCores; } - public long getMinSpillableBufferBytes() { return minSpillableBufferBytes_; } - public void setMinSpillableBufferBytes(long minSpillableBufferBytes) { - minSpillableBufferBytes_ = minSpillableBufferBytes; - } public void setTestEnv(boolean v) { isTestEnv_ = v; } public boolean isTestEnv() { return isTestEnv_; } public boolean isKuduSupported() { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/AggregationNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java index 004c84e..c938f76 100644 --- a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java +++ b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java @@ -30,7 +30,6 @@ import org.apache.impala.analysis.Expr; import org.apache.impala.analysis.FunctionCallExpr; import org.apache.impala.analysis.SlotId; import org.apache.impala.common.InternalException; -import org.apache.impala.common.RuntimeEnv; import org.apache.impala.thrift.TAggregationNode; import org.apache.impala.thrift.TExplainLevel; import org.apache.impala.thrift.TExpr; @@ -302,24 +301,24 @@ public class AggregationNode extends PlanNode { // Must be kept in sync with PartitionedAggregationNode::MinRequiredBuffers() in be. long perInstanceMinBuffers; + long bufferSize = queryOptions.getDefault_spillable_buffer_size(); if (aggInfo_.getGroupingExprs().isEmpty() || useStreamingPreagg_) { perInstanceMinBuffers = 0; } else { final int PARTITION_FANOUT = 16; - long minBuffers = 2 * PARTITION_FANOUT + 1 + (aggInfo_.needsSerialize() ? 1 : 0); - long bufferSize = getDefaultSpillableBufferBytes(); + long minBuffers = PARTITION_FANOUT + 1 + (aggInfo_.needsSerialize() ? 1 : 0); if (perInstanceDataBytes != -1) { long bytesPerBuffer = perInstanceDataBytes / PARTITION_FANOUT; // Scale down the buffer size if we think there will be excess free space with the // default buffer size, e.g. with small dimension tables. bufferSize = Math.min(bufferSize, Math.max( - RuntimeEnv.INSTANCE.getMinSpillableBufferBytes(), + queryOptions.getMin_spillable_buffer_size(), BitUtil.roundUpToPowerOf2(bytesPerBuffer))); } perInstanceMinBuffers = bufferSize * minBuffers; } - nodeResourceProfile_ = - new ResourceProfile(perInstanceMemEstimate, perInstanceMinBuffers); + nodeResourceProfile_ = ResourceProfile.spillableWithMinReservation( + perInstanceMemEstimate, perInstanceMinBuffers, bufferSize); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java b/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java index d4bafcf..0322d88 100644 --- a/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java +++ b/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java @@ -248,8 +248,11 @@ public class AnalyticEvalNode extends PlanNode { // TODO: come up with estimate based on window long perInstanceMemEstimate = 0; + // Analytic always uses the default spillable buffer size. + long bufferSize = queryOptions.getDefault_spillable_buffer_size(); // Must be kept in sync with MIN_REQUIRED_BUFFERS in AnalyticEvalNode in be. - long perInstanceMinBufferBytes = 2 * getDefaultSpillableBufferBytes(); - nodeResourceProfile_ = new ResourceProfile(perInstanceMemEstimate, perInstanceMinBufferBytes); + long perInstanceMinBufferBytes = 2 * bufferSize; + nodeResourceProfile_ = ResourceProfile.spillableWithMinReservation( + perInstanceMemEstimate, perInstanceMinBufferBytes, bufferSize); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java index 879d9d8..cea9b53 100644 --- a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java @@ -332,7 +332,7 @@ public class DataSourceScanNode extends ScanNode { @Override public void computeNodeResourceProfile(TQueryOptions queryOptions) { // TODO: What's a good estimate of memory consumption? - nodeResourceProfile_ = new ResourceProfile(1024L * 1024L * 1024L, 0); + nodeResourceProfile_ = ResourceProfile.noReservation(1024L * 1024L * 1024L); } @Override http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java b/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java index d1369f5..af4f9a6 100644 --- a/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java +++ b/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java @@ -51,7 +51,7 @@ public class DataStreamSink extends DataSink { @Override public void computeResourceProfile(TQueryOptions queryOptions) { - resourceProfile_ = new ResourceProfile(0, 0); + resourceProfile_ = ResourceProfile.noReservation(0); } @Override http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java b/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java index 0d0acc9..3fb8bae 100644 --- a/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java +++ b/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java @@ -62,7 +62,7 @@ public class EmptySetNode extends PlanNode { @Override public void computeNodeResourceProfile(TQueryOptions queryOptions) { // TODO: add an estimate - nodeResourceProfile_ = new ResourceProfile(0, 0); + nodeResourceProfile_ = ResourceProfile.noReservation(0); } @Override http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java index 478a054..87d2fd2 100644 --- a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java +++ b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java @@ -184,7 +184,7 @@ public class ExchangeNode extends PlanNode { @Override public void computeNodeResourceProfile(TQueryOptions queryOptions) { // TODO: add an estimate - nodeResourceProfile_ = new ResourceProfile(0, 0); + nodeResourceProfile_ = ResourceProfile.noReservation(0); } @Override http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java index bbecbf1..d56aa98 100644 --- a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java @@ -497,7 +497,7 @@ public class HBaseScanNode extends ScanNode { @Override public void computeNodeResourceProfile(TQueryOptions queryOptions) { // TODO: What's a good estimate of memory consumption? - nodeResourceProfile_ = new ResourceProfile(1024L * 1024L * 1024L, 0); + nodeResourceProfile_ = ResourceProfile.noReservation(1024L * 1024L * 1024L); } /** http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/HBaseTableSink.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/HBaseTableSink.java b/fe/src/main/java/org/apache/impala/planner/HBaseTableSink.java index 947665e..28939ed 100644 --- a/fe/src/main/java/org/apache/impala/planner/HBaseTableSink.java +++ b/fe/src/main/java/org/apache/impala/planner/HBaseTableSink.java @@ -44,7 +44,7 @@ public class HBaseTableSink extends TableSink { @Override public void computeResourceProfile(TQueryOptions queryOptions) { - resourceProfile_ = new ResourceProfile(0, 0); + resourceProfile_ = ResourceProfile.noReservation(0); } @Override http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java index e828125..5ff17c5 100644 --- a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java @@ -28,7 +28,6 @@ import org.apache.impala.catalog.Type; import org.apache.impala.common.AnalysisException; import org.apache.impala.common.ImpalaException; import org.apache.impala.common.InternalException; -import org.apache.impala.common.RuntimeEnv; import org.apache.impala.thrift.TEqJoinCondition; import org.apache.impala.thrift.TExplainLevel; import org.apache.impala.thrift.THashJoinNode; @@ -223,17 +222,18 @@ public class HashJoinNode extends JoinNode { long minBuffers = PARTITION_FANOUT + 1 + (joinOp_ == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN ? 3 : 0); - long bufferSize = getDefaultSpillableBufferBytes(); + long bufferSize = queryOptions.getDefault_spillable_buffer_size(); if (perInstanceDataBytes != -1) { long bytesPerBuffer = perInstanceDataBytes / PARTITION_FANOUT; // Scale down the buffer size if we think there will be excess free space with the // default buffer size, e.g. if the right side is a small dimension table. bufferSize = Math.min(bufferSize, Math.max( - RuntimeEnv.INSTANCE.getMinSpillableBufferBytes(), + queryOptions.getMin_spillable_buffer_size(), BitUtil.roundUpToPowerOf2(bytesPerBuffer))); } long perInstanceMinBufferBytes = bufferSize * minBuffers; - nodeResourceProfile_ = new ResourceProfile(perInstanceMemEstimate, perInstanceMinBufferBytes); + nodeResourceProfile_ = ResourceProfile.spillableWithMinReservation( + perInstanceMemEstimate, perInstanceMinBufferBytes, bufferSize); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java index 0ba5bc6..bf183be 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java @@ -1021,7 +1021,7 @@ public class HdfsScanNode extends ScanNode { public void computeNodeResourceProfile(TQueryOptions queryOptions) { Preconditions.checkNotNull(scanRanges_, "Cost estimation requires scan ranges."); if (scanRanges_.isEmpty()) { - nodeResourceProfile_ = new ResourceProfile(0, 0); + nodeResourceProfile_ = ResourceProfile.noReservation(0); return; } Preconditions.checkState(0 < numNodes_ && numNodes_ <= scanRanges_.size()); @@ -1075,7 +1075,7 @@ public class HdfsScanNode extends ScanNode { PrintUtils.printBytes(perHostUpperBound))); perInstanceMemEstimate = perHostUpperBound; } - nodeResourceProfile_ = new ResourceProfile(perInstanceMemEstimate, 0); + nodeResourceProfile_ = ResourceProfile.noReservation(perInstanceMemEstimate); } /** http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java index fed4ffd..46709c0 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java @@ -99,7 +99,7 @@ public class HdfsTableSink extends TableSink { PlanNode.checkedMultiply(numPartitionsPerInstance, perPartitionMemReq); perInstanceMemEstimate = Math.min(perInstanceInputBytes, perInstanceMemReq); } - resourceProfile_ = new ResourceProfile(perInstanceMemEstimate, 0); + resourceProfile_ = ResourceProfile.noReservation(perInstanceMemEstimate); } /** http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java b/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java index 69cc133..14acb26 100644 --- a/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java +++ b/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java @@ -87,6 +87,6 @@ public class JoinBuildSink extends DataSink { @Override public void computeResourceProfile(TQueryOptions queryOptions) { // The memory consumption is counted against the join PlanNode. - resourceProfile_ = new ResourceProfile(0, 0); + resourceProfile_ = ResourceProfile.noReservation(0); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java index 57403e4..37a4e5c 100644 --- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java @@ -270,7 +270,7 @@ public class KuduScanNode extends ScanNode { @Override public void computeNodeResourceProfile(TQueryOptions queryOptions) { - nodeResourceProfile_ = new ResourceProfile(0, 0); + nodeResourceProfile_ = ResourceProfile.noReservation(0); } @Override http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java b/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java index b7dcdd8..f75b170 100644 --- a/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java +++ b/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java @@ -60,7 +60,7 @@ public class KuduTableSink extends TableSink { @Override public void computeResourceProfile(TQueryOptions queryOptions) { // TODO: add a memory estimate - resourceProfile_ = new ResourceProfile(0, 0); + resourceProfile_ = ResourceProfile.noReservation(0); } @Override http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java b/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java index 0ec8e4f..16a3caf 100644 --- a/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java +++ b/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java @@ -20,9 +20,6 @@ package org.apache.impala.planner; import java.util.Collections; import java.util.List; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.impala.analysis.Analyzer; import org.apache.impala.analysis.BinaryPredicate; import org.apache.impala.analysis.Expr; @@ -86,7 +83,7 @@ public class NestedLoopJoinNode extends JoinNode { perInstanceMemEstimate = (long) Math.ceil(getChild(1).cardinality_ * getChild(1).avgRowSize_); } - nodeResourceProfile_ = new ResourceProfile(perInstanceMemEstimate, 0); + nodeResourceProfile_ = ResourceProfile.noReservation(perInstanceMemEstimate); } @Override http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/PlanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java b/fe/src/main/java/org/apache/impala/planner/PlanNode.java index 9723c4a..2557f98 100644 --- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java @@ -33,7 +33,7 @@ import org.apache.impala.common.ImpalaException; import org.apache.impala.common.PrintUtils; import org.apache.impala.common.TreeNode; import org.apache.impala.planner.RuntimeFilterGenerator.RuntimeFilter; -import org.apache.impala.service.BackendConfig; +import org.apache.impala.thrift.TBackendResourceProfile; import org.apache.impala.thrift.TExecStats; import org.apache.impala.thrift.TExplainLevel; import org.apache.impala.thrift.TPlan; @@ -408,6 +408,8 @@ abstract public class PlanNode extends TreeNode<PlanNode> { msg.addToRuntime_filters(filter.toThrift()); } msg.setDisable_codegen(disableCodegen_); + Preconditions.checkState(nodeResourceProfile_.isValid()); + msg.resource_profile = nodeResourceProfile_.toThrift(); toThrift(msg); container.addToNodes(msg); // For the purpose of the BE consider ExchangeNodes to have no children. @@ -677,16 +679,6 @@ abstract public class PlanNode extends TreeNode<PlanNode> { } /** - * The default size of buffer used in spilling nodes. Used in - * computeNodeResourceProfile(). - */ - protected final static long getDefaultSpillableBufferBytes() { - // BufferedBlockMgr uses --read_size to determine buffer size. - // TODO: IMPALA-3200: get from query option - return BackendConfig.INSTANCE.getReadSize(); - } - - /** * The input cardinality is the sum of output cardinalities of its children. * For scan nodes the input cardinality is the expected number of rows scanned. */ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java index fba9149..07eb58b 100644 --- a/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java +++ b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java @@ -37,7 +37,7 @@ public class PlanRootSink extends DataSink { @Override public void computeResourceProfile(TQueryOptions queryOptions) { // TODO: add a memory estimate - resourceProfile_ = new ResourceProfile(0, 0); + resourceProfile_ = ResourceProfile.noReservation(0); } protected TDataSink toThrift() { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/Planner.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java index 4cfd57e..ed6e8df 100644 --- a/fe/src/main/java/org/apache/impala/planner/Planner.java +++ b/fe/src/main/java/org/apache/impala/planner/Planner.java @@ -63,7 +63,7 @@ public class Planner { public static final long MIN_PER_HOST_MEM_ESTIMATE_BYTES = 10 * 1024 * 1024; public static final ResourceProfile MIN_PER_HOST_RESOURCES = - new ResourceProfile(MIN_PER_HOST_MEM_ESTIMATE_BYTES, 0); + ResourceProfile.withMinReservation(MIN_PER_HOST_MEM_ESTIMATE_BYTES, 0); private final PlannerContext ctx_; @@ -262,9 +262,9 @@ public class Planner { TQueryExecRequest request, TExplainLevel explainLevel) { StringBuilder str = new StringBuilder(); boolean hasHeader = false; - if (request.isSetPer_host_min_reservation()) { + if (request.query_ctx.isSetPer_host_min_reservation()) { str.append(String.format("Per-Host Resource Reservation: Memory=%s\n", - PrintUtils.printBytes(request.getPer_host_min_reservation()))) ; + PrintUtils.printBytes(request.query_ctx.getPer_host_min_reservation()))); hasHeader = true; } if (request.isSetPer_host_mem_estimate()) { @@ -344,7 +344,7 @@ public class Planner { * per-host resource values in 'request'. */ public void computeResourceReqs(List<PlanFragment> planRoots, - TQueryExecRequest request) { + TQueryCtx queryCtx, TQueryExecRequest request) { Preconditions.checkState(!planRoots.isEmpty()); Preconditions.checkNotNull(request); TQueryOptions queryOptions = ctx_.getRootAnalyzer().getQueryOptions(); @@ -389,8 +389,8 @@ public class Planner { perHostPeakResources = MIN_PER_HOST_RESOURCES.max(perHostPeakResources); request.setPer_host_mem_estimate(perHostPeakResources.getMemEstimateBytes()); - request.setPer_host_min_reservation(perHostPeakResources.getMinReservationBytes()); - request.setPer_host_initial_reservation_total_claims(perHostInitialReservationTotal); + queryCtx.setPer_host_min_reservation(perHostPeakResources.getMinReservationBytes()); + queryCtx.setPer_host_initial_reservation_total_claims(perHostInitialReservationTotal); if (LOG.isTraceEnabled()) { LOG.trace("Per-host min buffer : " + perHostPeakResources.getMinReservationBytes()); LOG.trace( http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java b/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java index 18cde7e..3c13812 100644 --- a/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java +++ b/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java @@ -18,6 +18,7 @@ package org.apache.impala.planner; import org.apache.impala.common.PrintUtils; +import org.apache.impala.thrift.TBackendResourceProfile; import org.apache.impala.util.MathUtil; /** @@ -35,25 +36,56 @@ public class ResourceProfile { private final long memEstimateBytes_; // Minimum buffer reservation required to execute in bytes. + // The valid range is [0, maxReservationBytes_]. private final long minReservationBytes_; - private ResourceProfile(boolean isValid, long memEstimateBytes, long minReservationBytes) { + // Maximum buffer reservation allowed for this plan node. + // The valid range is [minReservationBytes_, Long.MAX_VALUE]. + private final long maxReservationBytes_; + + // The spillable buffer size to use in a plan node. Only valid for resource profiles + // for spilling PlanNodes. Operations like sum(), max(), etc., produce profiles without + // valid spillableBufferBytes_ values. -1 means invalid. + private final long spillableBufferBytes_; + + private ResourceProfile(boolean isValid, long memEstimateBytes, + long minReservationBytes, long maxReservationBytes, long spillableBufferBytes) { isValid_ = isValid; memEstimateBytes_ = memEstimateBytes; minReservationBytes_ = minReservationBytes; + maxReservationBytes_ = maxReservationBytes; + spillableBufferBytes_ = spillableBufferBytes; + } + + // Create a resource profile with zero min or max reservation. + public static ResourceProfile noReservation(long memEstimateBytes) { + return new ResourceProfile(true, memEstimateBytes, 0, 0, -1); + } + + // Create a resource profile with a minimum reservation (but no maximum). + public static ResourceProfile withMinReservation(long memEstimateBytes, + long minReservationBytes) { + return new ResourceProfile( + true, memEstimateBytes, minReservationBytes, Long.MAX_VALUE, -1); } - public ResourceProfile(long memEstimateBytes, long minReservationBytes) { - this(true, memEstimateBytes, minReservationBytes); + // Create a resource profile with a minimum reservation (but no maximum) and a + // spillable buffer size. + public static ResourceProfile spillableWithMinReservation(long memEstimateBytes, + long minReservationBytes, long spillableBufferBytes) { + return new ResourceProfile(true, memEstimateBytes, minReservationBytes, + Long.MAX_VALUE, spillableBufferBytes); } public static ResourceProfile invalid() { - return new ResourceProfile(false, -1, -1); + return new ResourceProfile(false, -1, -1, -1, -1); } public boolean isValid() { return isValid_; } public long getMemEstimateBytes() { return memEstimateBytes_; } public long getMinReservationBytes() { return minReservationBytes_; } + public long getMaxReservationBytes() { return maxReservationBytes_; } + public long getSpillableBufferBytes() { return spillableBufferBytes_; } // Return a string with the resource profile information suitable for display in an // explain plan in a format like: "resource1=value resource2=value" @@ -63,6 +95,12 @@ public class ResourceProfile { output.append(isValid_ ? PrintUtils.printBytes(memEstimateBytes_) : "invalid"); output.append(" mem-reservation="); output.append(isValid_ ? PrintUtils.printBytes(minReservationBytes_) : "invalid"); + // TODO: output maxReservation_ here if the planner becomes more sophisticated in + // choosing it (beyond 0/unlimited). + if (isValid_ && spillableBufferBytes_ != -1) { + output.append(" spill-buffer="); + output.append(PrintUtils.printBytes(spillableBufferBytes_)); + } return output.toString(); } @@ -70,25 +108,39 @@ public class ResourceProfile { public ResourceProfile max(ResourceProfile other) { if (!isValid()) return other; if (!other.isValid()) return this; - return new ResourceProfile( + return new ResourceProfile(true, Math.max(getMemEstimateBytes(), other.getMemEstimateBytes()), - Math.max(getMinReservationBytes(), other.getMinReservationBytes())); + Math.max(getMinReservationBytes(), other.getMinReservationBytes()), + Math.max(getMaxReservationBytes(), other.getMaxReservationBytes()), -1); } // Returns a profile with the sum of each value in 'this' and 'other'. public ResourceProfile sum(ResourceProfile other) { if (!isValid()) return other; if (!other.isValid()) return this; - return new ResourceProfile( + return new ResourceProfile(true, MathUtil.saturatingAdd(getMemEstimateBytes(), other.getMemEstimateBytes()), - MathUtil.saturatingAdd(getMinReservationBytes(), other.getMinReservationBytes())); + MathUtil.saturatingAdd(getMinReservationBytes(),other.getMinReservationBytes()), + MathUtil.saturatingAdd(getMaxReservationBytes(), other.getMaxReservationBytes()), + -1); } // Returns a profile with all values multiplied by 'factor'. public ResourceProfile multiply(int factor) { if (!isValid()) return this; - return new ResourceProfile( + return new ResourceProfile(true, MathUtil.saturatingMultiply(memEstimateBytes_, factor), - MathUtil.saturatingMultiply(minReservationBytes_, factor)); + MathUtil.saturatingMultiply(minReservationBytes_, factor), + MathUtil.saturatingMultiply(maxReservationBytes_, factor), -1); + } + + public TBackendResourceProfile toThrift() { + TBackendResourceProfile result = new TBackendResourceProfile(); + result.setMin_reservation(minReservationBytes_); + result.setMax_reservation(maxReservationBytes_); + if (spillableBufferBytes_ != -1) { + result.setSpillable_buffer_size(spillableBufferBytes_); + } + return result; } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/SelectNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/SelectNode.java b/fe/src/main/java/org/apache/impala/planner/SelectNode.java index 97dfa5b..3ffc975 100644 --- a/fe/src/main/java/org/apache/impala/planner/SelectNode.java +++ b/fe/src/main/java/org/apache/impala/planner/SelectNode.java @@ -84,7 +84,7 @@ public class SelectNode extends PlanNode { @Override public void computeNodeResourceProfile(TQueryOptions queryOptions) { // TODO: add an estimate - nodeResourceProfile_ = new ResourceProfile(0, 0); + nodeResourceProfile_ = ResourceProfile.noReservation(0); } @Override http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java b/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java index bed1c9a..bdf3a01 100644 --- a/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java +++ b/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java @@ -68,7 +68,7 @@ public class SingularRowSrcNode extends PlanNode { @Override public void computeNodeResourceProfile(TQueryOptions queryOptions) { // TODO: add an estimate - nodeResourceProfile_ = new ResourceProfile(0, 0); + nodeResourceProfile_ = ResourceProfile.noReservation(0); } @Override http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/SortNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/SortNode.java b/fe/src/main/java/org/apache/impala/planner/SortNode.java index aee8fda..75e8034 100644 --- a/fe/src/main/java/org/apache/impala/planner/SortNode.java +++ b/fe/src/main/java/org/apache/impala/planner/SortNode.java @@ -29,7 +29,6 @@ import org.apache.impala.analysis.SlotDescriptor; import org.apache.impala.analysis.SlotRef; import org.apache.impala.analysis.SortInfo; import org.apache.impala.common.InternalException; -import org.apache.impala.service.BackendConfig; import org.apache.impala.thrift.TExplainLevel; import org.apache.impala.thrift.TPlanNode; import org.apache.impala.thrift.TPlanNodeType; @@ -255,7 +254,7 @@ public class SortNode extends PlanNode { if (type_ == TSortType.TOPN) { long perInstanceMemEstimate = (long) Math.ceil((cardinality_ + offset_) * avgRowSize_); - nodeResourceProfile_ = new ResourceProfile(perInstanceMemEstimate, 0); + nodeResourceProfile_ = ResourceProfile.noReservation(perInstanceMemEstimate); return; } @@ -265,44 +264,40 @@ public class SortNode extends PlanNode { // size sqrt(N) blocks, and we could merge sqrt(N) such runs with sqrt(N) blocks // of memory. double fullInputSize = getChild(0).cardinality_ * avgRowSize_; - boolean hasVarLenSlots = false; + boolean usesVarLenBlocks = false; for (SlotDescriptor slotDesc: info_.getSortTupleDescriptor().getSlots()) { if (slotDesc.isMaterialized() && !slotDesc.getType().isFixedLengthType()) { - hasVarLenSlots = true; + usesVarLenBlocks = true; break; } } - // The block size used by the sorter is the same as the configured I/O read size. - long blockSize = BackendConfig.INSTANCE.getReadSize(); - // The external sorter writes fixed-len and var-len data in separate sequences of - // blocks on disk and reads from both sequences when merging. This effectively - // doubles the block size when there are var-len columns present. - if (hasVarLenSlots) blockSize *= 2; + // Sort always uses the default spillable buffer size. + long bufferSize = queryOptions.getDefault_spillable_buffer_size(); + // The external sorter writes fixed-len and var-len data in separate sequences of + // pages on disk and reads from both sequences when merging. This effectively + // doubles the number of pages required when there are var-len columns present. + // Must be kept in sync with ComputeMinReservation() in Sorter in be. + int pageMultiplier = usesVarLenBlocks ? 2 : 1; + long perInstanceMemEstimate; + long perInstanceMinReservation; if (type_ == TSortType.PARTIAL) { // The memory limit cannot be less than the size of the required blocks. - long mem_limit = - PARTIAL_SORT_MEM_LIMIT > blockSize ? PARTIAL_SORT_MEM_LIMIT : blockSize; + long mem_limit = Math.max(PARTIAL_SORT_MEM_LIMIT, bufferSize * pageMultiplier); // 'fullInputSize' will be negative if stats are missing, just use the limit. - long perInstanceMemEstimate = fullInputSize < 0 ? + perInstanceMemEstimate = fullInputSize < 0 ? mem_limit : Math.min((long) Math.ceil(fullInputSize), mem_limit); - nodeResourceProfile_ = new ResourceProfile(perInstanceMemEstimate, blockSize); + perInstanceMinReservation = bufferSize * pageMultiplier; } else { - Preconditions.checkState(type_ == TSortType.TOTAL); - double numInputBlocks = Math.ceil(fullInputSize / blockSize); - long perInstanceMemEstimate = - blockSize * (long) Math.ceil(Math.sqrt(numInputBlocks)); - - // Must be kept in sync with min_buffers_required in Sorter in be. - long perInstanceMinReservation = 3 * getDefaultSpillableBufferBytes(); - if (info_.getSortTupleDescriptor().hasVarLenSlots()) { - perInstanceMinReservation *= 2; - } - nodeResourceProfile_ = - new ResourceProfile(perInstanceMemEstimate, perInstanceMinReservation); + double numInputBlocks = Math.ceil(fullInputSize / (bufferSize * pageMultiplier)); + perInstanceMemEstimate = + bufferSize * (long) Math.ceil(Math.sqrt(numInputBlocks)); + perInstanceMinReservation = 3 * bufferSize * pageMultiplier; } + nodeResourceProfile_ = ResourceProfile.spillableWithMinReservation( + perInstanceMemEstimate, perInstanceMinReservation, bufferSize); } private static String getDisplayName(TSortType type) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/SubplanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/SubplanNode.java b/fe/src/main/java/org/apache/impala/planner/SubplanNode.java index c09efe5..e41290e 100644 --- a/fe/src/main/java/org/apache/impala/planner/SubplanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/SubplanNode.java @@ -95,7 +95,7 @@ public class SubplanNode extends PlanNode { @Override public void computeNodeResourceProfile(TQueryOptions queryOptions) { // TODO: add an estimate - nodeResourceProfile_ = new ResourceProfile(0, 0); + nodeResourceProfile_ = ResourceProfile.noReservation(0); } @Override http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/UnionNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/UnionNode.java b/fe/src/main/java/org/apache/impala/planner/UnionNode.java index 44e2967..302f62d 100644 --- a/fe/src/main/java/org/apache/impala/planner/UnionNode.java +++ b/fe/src/main/java/org/apache/impala/planner/UnionNode.java @@ -131,7 +131,7 @@ public class UnionNode extends PlanNode { @Override public void computeNodeResourceProfile(TQueryOptions queryOptions) { // TODO: add an estimate - nodeResourceProfile_ = new ResourceProfile(0, 0); + nodeResourceProfile_ = ResourceProfile.noReservation(0); } @Override http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/UnnestNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/UnnestNode.java b/fe/src/main/java/org/apache/impala/planner/UnnestNode.java index 695ec24..7e0a87e 100644 --- a/fe/src/main/java/org/apache/impala/planner/UnnestNode.java +++ b/fe/src/main/java/org/apache/impala/planner/UnnestNode.java @@ -76,7 +76,7 @@ public class UnnestNode extends PlanNode { @Override public void computeNodeResourceProfile(TQueryOptions queryOptions) { // TODO: add an estimate - nodeResourceProfile_ = new ResourceProfile(0, 0); + nodeResourceProfile_ = ResourceProfile.noReservation(0); } @Override http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/service/Frontend.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index 2c71a9b..60e84b4 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -1005,7 +1005,7 @@ public class Frontend { } // Compute resource requirements of the final plans. - planner.computeResourceReqs(planRoots, result); + planner.computeResourceReqs(planRoots, queryCtx, result); // create per-plan exec info; // also assemble list of names of tables with missing or corrupt stats for http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/test/java/org/apache/impala/planner/PlannerTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java index 8289ee8..b0f1e2e 100644 --- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java +++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java @@ -425,8 +425,6 @@ public class PlannerTest extends PlannerTestBase { TQueryOptions options = defaultQueryOptions(); options.setExplain_level(TExplainLevel.EXTENDED); options.setNum_scanner_threads(1); // Required so that output doesn't vary by machine - // TODO: IMPALA-3200 - this should become a query option. - RuntimeEnv.INSTANCE.setMinSpillableBufferBytes(64 * 1024); runPlannerTestFile("spillable-buffer-sizing", options, false); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test b/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test index ed4f684..f4ae6c3 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test @@ -102,7 +102,7 @@ having 1024 * 1024 * count(*) % 2 = 0 and (sm between 5 and 10) ---- PLAN F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 -| Per-Host Resources: mem-estimate=138.00MB mem-reservation=264.00MB +| Per-Host Resources: mem-estimate=138.00MB mem-reservation=1.06MB PLAN-ROOT SINK | mem-estimate=0B mem-reservation=0B | @@ -110,7 +110,7 @@ PLAN-ROOT SINK | output: sum(2 + id), count(*) | group by: timestamp_col = TIMESTAMP '2016-11-15 00:00:00' | having: sum(2 + id) <= 10, sum(2 + id) > 1, sum(2 + id) >= 5, 1048576 * count(*) % 2 = 0 -| mem-estimate=10.00MB mem-reservation=264.00MB +| mem-estimate=10.00MB mem-reservation=1.06MB spill-buffer=64.00KB | tuple-ids=1 row-size=17B cardinality=0 | 00:SCAN HDFS [functional.alltypes] @@ -129,7 +129,7 @@ left outer join functional.alltypes b where round(1.11 + 2.22 + 3.33 + 4.44, 1) < cast(b.double_col as decimal(3, 2)) ---- PLAN F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 -| Per-Host Resources: mem-estimate=256.02MB mem-reservation=136.00MB +| Per-Host Resources: mem-estimate=256.02MB mem-reservation=1.06MB PLAN-ROOT SINK | mem-estimate=0B mem-reservation=0B | @@ -138,7 +138,7 @@ PLAN-ROOT SINK | fk/pk conjuncts: assumed fk/pk | other join predicates: a.int_col <= b.bigint_col + 97, a.int_col >= 0 + b.bigint_col | other predicates: CAST(b.double_col AS DECIMAL(3,2)) > 11.1 -| mem-estimate=15.68KB mem-reservation=136.00MB +| mem-estimate=15.68KB mem-reservation=1.06MB spill-buffer=64.00KB | tuple-ids=0,1N row-size=28B cardinality=7300 | |--01:SCAN HDFS [functional.alltypes b] @@ -203,7 +203,7 @@ group by timestamp_col = cast('2015-11-15' as timestamp) + interval 1 year having 1024 * 1024 * count(*) % 2 = 0 ---- PLAN F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 -| Per-Host Resources: mem-estimate=138.00MB mem-reservation=528.00MB +| Per-Host Resources: mem-estimate=138.00MB mem-reservation=2.12MB PLAN-ROOT SINK | mem-estimate=0B mem-reservation=0B | @@ -211,13 +211,13 @@ PLAN-ROOT SINK | output: sum(2 + id), count:merge(*) | group by: timestamp_col = TIMESTAMP '2016-11-15 00:00:00' | having: 1048576 * count(*) % 2 = 0 -| mem-estimate=10.00MB mem-reservation=264.00MB +| mem-estimate=10.00MB mem-reservation=1.06MB spill-buffer=64.00KB | tuple-ids=2 row-size=17B cardinality=0 | 01:AGGREGATE | output: count(*) | group by: timestamp_col = TIMESTAMP '2016-11-15 00:00:00', 2 + id -| mem-estimate=10.00MB mem-reservation=264.00MB +| mem-estimate=10.00MB mem-reservation=1.06MB spill-buffer=64.00KB | tuple-ids=1 row-size=17B cardinality=7300 | 00:SCAN HDFS [functional.alltypes] @@ -234,20 +234,20 @@ from functional.alltypes having 1024 * 1024 * count(*) % 2 = 0 ---- PLAN F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 -| Per-Host Resources: mem-estimate=138.00MB mem-reservation=264.00MB +| Per-Host Resources: mem-estimate=138.00MB mem-reservation=1.06MB PLAN-ROOT SINK | mem-estimate=0B mem-reservation=0B | 02:AGGREGATE [FINALIZE] | output: sum(2 + id), count:merge(*) | having: 1048576 * zeroifnull(count(*)) % 2 = 0 -| mem-estimate=10.00MB mem-reservation=0B +| mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB | tuple-ids=2 row-size=16B cardinality=0 | 01:AGGREGATE | output: count(*) | group by: 2 + id -| mem-estimate=10.00MB mem-reservation=264.00MB +| mem-estimate=10.00MB mem-reservation=1.06MB spill-buffer=64.00KB | tuple-ids=1 row-size=16B cardinality=7300 | 00:SCAN HDFS [functional.alltypes] @@ -265,7 +265,7 @@ select first_value(1 + 1 + int_col - (1 - 1)) over from functional.alltypes ---- PLAN F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 -| Per-Host Resources: mem-estimate=144.00MB mem-reservation=64.00MB +| Per-Host Resources: mem-estimate=130.00MB mem-reservation=16.00MB PLAN-ROOT SINK | mem-estimate=0B mem-reservation=0B | @@ -274,13 +274,13 @@ PLAN-ROOT SINK | partition by: concat('ab', string_col) | order by: greatest(20, bigint_col) ASC | window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW -| mem-estimate=0B mem-reservation=16.00MB +| mem-estimate=0B mem-reservation=4.00MB spill-buffer=2.00MB | tuple-ids=3,2 row-size=61B cardinality=7300 | 01:SORT | order by: concat('ab', string_col) ASC NULLS FIRST, greatest(20, bigint_col) ASC | materialized: concat('ab', string_col), greatest(20, bigint_col) -| mem-estimate=16.00MB mem-reservation=48.00MB +| mem-estimate=2.00MB mem-reservation=12.00MB spill-buffer=2.00MB | tuple-ids=3 row-size=53B cardinality=7300 | 00:SCAN HDFS [functional.alltypes] @@ -296,13 +296,13 @@ select int_col from functional.alltypes order by id * abs((factorial(5) / power(2, 4))) ---- PLAN F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 -| Per-Host Resources: mem-estimate=136.00MB mem-reservation=24.00MB +| Per-Host Resources: mem-estimate=130.00MB mem-reservation=6.00MB PLAN-ROOT SINK | mem-estimate=0B mem-reservation=0B | 01:SORT | order by: id * 7.5 ASC -| mem-estimate=8.00MB mem-reservation=24.00MB +| mem-estimate=2.00MB mem-reservation=6.00MB spill-buffer=2.00MB | tuple-ids=1 row-size=8B cardinality=7300 | 00:SCAN HDFS [functional.alltypes] @@ -347,7 +347,7 @@ PLAN-ROOT SINK | 01:AGGREGATE [FINALIZE] | output: sum(id + 10 + 20 + 30) -| mem-estimate=10.00MB mem-reservation=0B +| mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB | tuple-ids=4 row-size=8B cardinality=1 | 00:SCAN HDFS [functional.alltypes] http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test b/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test index e64691a..d367424 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test @@ -78,7 +78,7 @@ select count(*) from functional.alltypes t1 join functional.alltypestiny t2 on t1.id = t2.id ---- DISTRIBUTEDPLAN -Per-Host Resource Reservation: Memory=136.00MB +Per-Host Resource Reservation: Memory=1.06MB Per-Host Resource Estimates: Memory=180.00MB Codegen disabled by planner
