IMPALA-3480: Add query options for min/max filter sizes This patch adds two query options for runtime filters:
RUNTIME_FILTER_MAX_SIZE RUNTIME_FILTER_MIN_SIZE These options define the minimum and maximum filter sizes for a filter, no matter what the estimates produced by the planner are. Filter sizes are rounded up to the nearest power of two. Change-Id: I5c13c200a0f1855f38a5da50ca34a737e741868b Reviewed-on: http://gerrit.cloudera.org:8080/2966 Tested-by: Internal Jenkins Reviewed-by: Henry Robinson <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/df1412c9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/df1412c9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/df1412c9 Branch: refs/heads/master Commit: df1412c962945fe6e69591e80354fad692413ba3 Parents: 14cdb04 Author: Henry Robinson <[email protected]> Authored: Thu May 5 10:00:29 2016 -0700 Committer: Tim Armstrong <[email protected]> Committed: Thu May 12 23:06:35 2016 -0700 ---------------------------------------------------------------------- be/src/exec/hash-table-test.cc | 6 +-- be/src/exec/hash-table.h | 2 +- be/src/exec/nested-loop-join-node.cc | 12 ++--- be/src/exec/partitioned-aggregation-node.cc | 2 +- be/src/runtime/runtime-filter.cc | 34 ++++++++---- be/src/runtime/runtime-filter.h | 14 +++-- be/src/service/query-options-test.cc | 56 +++++++++++--------- be/src/service/query-options.cc | 19 +++++-- be/src/service/query-options.h | 6 ++- be/src/util/bit-util.h | 7 ++- be/src/util/debug-util.cc | 5 +- be/src/util/debug-util.h | 1 + be/src/util/fixed-size-hash-table.h | 4 +- be/src/util/parse-util-test.cc | 12 ++++- be/src/util/parse-util.cc | 6 +++ be/src/util/parse-util.h | 1 + common/thrift/ImpalaInternalService.thrift | 6 +++ common/thrift/ImpalaService.thrift | 6 +++ .../queries/QueryTest/runtime_filters.test | 46 ++++++++++++++++ .../queries/QueryTest/runtime_filters_wait.test | 1 + 20 files changed, 183 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/exec/hash-table-test.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hash-table-test.cc b/be/src/exec/hash-table-test.cc index bef3f1e..25cd4f1 100644 --- a/be/src/exec/hash-table-test.cc +++ b/be/src/exec/hash-table-test.cc @@ -189,7 +189,7 @@ class HashTableTest : public testing::Test { &tracker_, runtime_state_, &client).ok()); // Initial_num_buckets must be a power of two. - EXPECT_EQ(initial_num_buckets, BitUtil::NextPowerOfTwo(initial_num_buckets)); + EXPECT_EQ(initial_num_buckets, BitUtil::RoundUpToPowerOfTwo(initial_num_buckets)); int64_t max_num_buckets = 1L << 31; table->reset(new HashTable(quadratic, runtime_state_, client, 1, NULL, max_num_buckets, initial_num_buckets)); @@ -354,12 +354,12 @@ class HashTableTest : public testing::Test { ProbeTest(hash_table.get(), &ht_ctx, probe_rows, total_rows, true); // Resize and try again. - int target_size = BitUtil::NextPowerOfTwo(2 * total_rows); + int target_size = BitUtil::RoundUpToPowerOfTwo(2 * total_rows); ResizeTable(hash_table.get(), target_size, &ht_ctx); EXPECT_EQ(hash_table->num_buckets(), target_size); ProbeTest(hash_table.get(), &ht_ctx, probe_rows, total_rows, true); - target_size = BitUtil::NextPowerOfTwo(total_rows + 1); + target_size = BitUtil::RoundUpToPowerOfTwo(total_rows + 1); ResizeTable(hash_table.get(), target_size, &ht_ctx); EXPECT_EQ(hash_table->num_buckets(), target_size); ProbeTest(hash_table.get(), &ht_ctx, probe_rows, total_rows, true); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/exec/hash-table.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hash-table.h b/be/src/exec/hash-table.h index e496cde..3d090a8 100644 --- a/be/src/exec/hash-table.h +++ b/be/src/exec/hash-table.h @@ -423,7 +423,7 @@ class HashTable { /// rounded up to a power of two, and also assumes that there are no duplicates. static int64_t EstimateNumBuckets(int64_t num_rows) { /// Assume max 66% fill factor and no duplicates. - return BitUtil::NextPowerOfTwo(3 * num_rows / 2); + return BitUtil::RoundUpToPowerOfTwo(3 * num_rows / 2); } static int64_t EstimateSize(int64_t num_rows) { int64_t num_buckets = EstimateNumBuckets(num_rows); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/exec/nested-loop-join-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/nested-loop-join-node.cc b/be/src/exec/nested-loop-join-node.cc index 5789534..646c089 100644 --- a/be/src/exec/nested-loop-join-node.cc +++ b/be/src/exec/nested-loop-join-node.cc @@ -316,7 +316,7 @@ Status NestedLoopJoinNode::GetNextLeftSemiJoin(RuntimeState* state, RowBatch* output_batch) { ExprContext* const* join_conjunct_ctxs = &join_conjunct_ctxs_[0]; size_t num_join_ctxs = join_conjunct_ctxs_.size(); - const int N = BitUtil::NextPowerOfTwo(state->batch_size()); + const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size()); while (!eos_) { DCHECK(HasValidProbeRow()); @@ -361,7 +361,7 @@ Status NestedLoopJoinNode::GetNextLeftAntiJoin(RuntimeState* state, RowBatch* output_batch) { ExprContext* const* join_conjunct_ctxs = &join_conjunct_ctxs_[0]; size_t num_join_ctxs = join_conjunct_ctxs_.size(); - const int N = BitUtil::NextPowerOfTwo(state->batch_size()); + const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size()); while (!eos_) { DCHECK(HasValidProbeRow()); @@ -414,7 +414,7 @@ Status NestedLoopJoinNode::GetNextRightSemiJoin(RuntimeState* state, ExprContext* const* join_conjunct_ctxs = &join_conjunct_ctxs_[0]; size_t num_join_ctxs = join_conjunct_ctxs_.size(); DCHECK(matching_build_rows_ != NULL); - const int N = BitUtil::NextPowerOfTwo(state->batch_size()); + const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size()); while (!eos_) { DCHECK(HasValidProbeRow()); @@ -471,7 +471,7 @@ Status NestedLoopJoinNode::GetNextRightAntiJoin(RuntimeState* state, ExprContext* const* join_conjunct_ctxs = &join_conjunct_ctxs_[0]; size_t num_join_ctxs = join_conjunct_ctxs_.size(); DCHECK(matching_build_rows_ != NULL); - const int N = BitUtil::NextPowerOfTwo(state->batch_size()); + const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size()); while (!eos_ && HasMoreProbeRows()) { DCHECK(HasValidProbeRow()); @@ -557,7 +557,7 @@ Status NestedLoopJoinNode::ProcessUnmatchedBuildRows( size_t num_ctxs = conjunct_ctxs_.size(); DCHECK(matching_build_rows_ != NULL); - const int N = BitUtil::NextPowerOfTwo(state->batch_size()); + const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size()); while (!build_row_iterator_.AtEnd()) { // This loop can go on for a long time if the conjuncts are very selective. Do query // maintenance every N iterations. @@ -612,7 +612,7 @@ Status NestedLoopJoinNode::FindBuildMatches( ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0]; size_t num_ctxs = conjunct_ctxs_.size(); - const int N = BitUtil::NextPowerOfTwo(state->batch_size()); + const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size()); while (!build_row_iterator_.AtEnd()) { DCHECK(current_probe_row_ != NULL); TupleRow* output_row = output_batch->GetRow(output_batch->AddRow()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/exec/partitioned-aggregation-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc index a1fef0f..0bf51a9 100644 --- a/be/src/exec/partitioned-aggregation-node.cc +++ b/be/src/exec/partitioned-aggregation-node.cc @@ -432,7 +432,7 @@ Status PartitionedAggregationNode::GetRowsFromPartition(RuntimeState* state, SCOPED_TIMER(get_results_timer_); int count = 0; - const int N = BitUtil::NextPowerOfTwo(state->batch_size()); + const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size()); // Keeping returning rows from the current partition. while (!output_iterator_.AtEnd()) { // This loop can go on for a long time if the conjuncts are very selective. Do query http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/runtime/runtime-filter.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-filter.cc b/be/src/runtime/runtime-filter.cc index 2659125..7616320 100644 --- a/be/src/runtime/runtime-filter.cc +++ b/be/src/runtime/runtime-filter.cc @@ -32,8 +32,8 @@ DEFINE_double(max_filter_error_rate, 0.75, "(Advanced) The maximum probability o const int RuntimeFilter::SLEEP_PERIOD_MS = 20; -const int32_t RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE; -const int32_t RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE; +const int64_t RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE; +const int64_t RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE; RuntimeFilterBank::RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* state) : query_ctx_(query_ctx), state_(state), closed_(false) { @@ -41,10 +41,26 @@ RuntimeFilterBank::RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* s state->runtime_profile()->AddCounter("BloomFilterBytes", TUnit::BYTES); // Clamp bloom filter size down to the limits {MIN,MAX}_BLOOM_FILTER_SIZE - int32_t bloom_filter_size = query_ctx_.request.query_options.runtime_bloom_filter_size; - bloom_filter_size = std::max(bloom_filter_size, MIN_BLOOM_FILTER_SIZE); - bloom_filter_size = std::min(bloom_filter_size, MAX_BLOOM_FILTER_SIZE); - default_log_filter_size_ = Bits::Log2Ceiling64(bloom_filter_size); + max_filter_size_ = query_ctx_.request.query_options.runtime_filter_max_size; + max_filter_size_ = max<int64_t>(max_filter_size_, MIN_BLOOM_FILTER_SIZE); + max_filter_size_ = + BitUtil::RoundUpToPowerOfTwo(min<int64_t>(max_filter_size_, MAX_BLOOM_FILTER_SIZE)); + + min_filter_size_ = query_ctx_.request.query_options.runtime_filter_min_size; + min_filter_size_ = max<int64_t>(min_filter_size_, MIN_BLOOM_FILTER_SIZE); + min_filter_size_ = + BitUtil::RoundUpToPowerOfTwo(min<int64_t>(min_filter_size_, MAX_BLOOM_FILTER_SIZE)); + + // Make sure that min <= max + min_filter_size_ = min<int64_t>(min_filter_size_, max_filter_size_); + + DCHECK_GT(min_filter_size_, 0); + DCHECK_GT(max_filter_size_, 0); + + default_filter_size_ = query_ctx_.request.query_options.runtime_bloom_filter_size; + default_filter_size_ = max<int64_t>(default_filter_size_, min_filter_size_); + default_filter_size_ = + BitUtil::RoundUpToPowerOfTwo(min<int64_t>(default_filter_size_, max_filter_size_)); } RuntimeFilter* RuntimeFilterBank::RegisterFilter(const TRuntimeFilterDesc& filter_desc, @@ -172,11 +188,11 @@ BloomFilter* RuntimeFilterBank::AllocateScratchBloomFilter(int32_t filter_id) { } int64_t RuntimeFilterBank::GetFilterSizeForNdv(int64_t ndv) { - if (ndv == -1) return 1LL << default_log_filter_size_; + if (ndv == -1) return default_filter_size_; int64_t required_space = 1LL << BloomFilter::MinLogSpace(ndv, FLAGS_max_filter_error_rate); - if (required_space > MAX_BLOOM_FILTER_SIZE) required_space = MAX_BLOOM_FILTER_SIZE; - if (required_space < MIN_BLOOM_FILTER_SIZE) required_space = MIN_BLOOM_FILTER_SIZE; + required_space = max<int64_t>(required_space, min_filter_size_); + required_space = min<int64_t>(required_space, max_filter_size_); return required_space; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/runtime/runtime-filter.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-filter.h b/be/src/runtime/runtime-filter.h index 50c77b0..178c03f 100644 --- a/be/src/runtime/runtime-filter.h +++ b/be/src/runtime/runtime-filter.h @@ -100,8 +100,8 @@ class RuntimeFilterBank { /// Releases all memory allocated for BloomFilters. void Close(); - static const int32_t MIN_BLOOM_FILTER_SIZE = 4 * 1024; // 4KB - static const int32_t MAX_BLOOM_FILTER_SIZE = 16 * 1024 * 1024; // 16MB + static const int64_t MIN_BLOOM_FILTER_SIZE = 4 * 1024; // 4KB + static const int64_t MAX_BLOOM_FILTER_SIZE = 16 * 1024 * 1024; // 16MB private: /// Returns the the space (in bytes) required for a filter to achieve the configured @@ -136,8 +136,14 @@ class RuntimeFilterBank { /// Total amount of memory allocated to Bloom Filters RuntimeProfile::Counter* memory_allocated_; - /// Precomputed logarithm of default BloomFilter heap size. - int default_log_filter_size_; + /// Precomputed default BloomFilter size. + int64_t default_filter_size_; + + /// Maximum filter size, in bytes, rounded up to a power of two. + int64_t max_filter_size_; + + /// Minimum filter size, in bytes, rounded up to a power of two. + int64_t min_filter_size_; }; /// RuntimeFilters represent set-membership predicates (implemented with bloom filters) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/service/query-options-test.cc ---------------------------------------------------------------------- diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc index 83a5770..53767d9 100644 --- a/be/src/service/query-options-test.cc +++ b/be/src/service/query-options-test.cc @@ -29,30 +29,38 @@ using namespace std; TEST(QueryOptions, SetBloomSize) { TQueryOptions options; - - // The upper and lower bound of the allowed values: - EXPECT_FALSE(SetQueryOption("RUNTIME_BLOOM_FILTER_SIZE", - lexical_cast<string>(RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE - 1), &options, NULL) - .ok()); - - EXPECT_FALSE(SetQueryOption("RUNTIME_BLOOM_FILTER_SIZE", - lexical_cast<string>(RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE + 1), &options, NULL) - .ok()); - - EXPECT_OK(SetQueryOption("RUNTIME_BLOOM_FILTER_SIZE", - lexical_cast<string>(RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE), &options, NULL)); - EXPECT_EQ(RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE, options.runtime_bloom_filter_size); - - EXPECT_OK(SetQueryOption("RUNTIME_BLOOM_FILTER_SIZE", - lexical_cast<string>(RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE), &options, NULL)); - EXPECT_EQ(RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE, options.runtime_bloom_filter_size); - - // Parsing memory values works in a reasonable way: - EXPECT_OK(SetQueryOption("RUNTIME_BLOOM_FILTER_SIZE", "1MB", &options, NULL)); - EXPECT_EQ(1 << 20, options.runtime_bloom_filter_size); - - // Bloom filters cannot occupy a percentage of memory: - EXPECT_FALSE(SetQueryOption("RUNTIME_BLOOM_FILTER_SIZE", "10%", &options, NULL).ok()); + vector<pair<string, int*>> option_list = { + {"RUNTIME_BLOOM_FILTER_SIZE", &options.runtime_bloom_filter_size}, + {"RUNTIME_FILTER_MAX_SIZE", &options.runtime_filter_max_size}, + {"RUNTIME_FILTER_MIN_SIZE", &options.runtime_filter_min_size}}; + for (const auto& option: option_list) { + + // The upper and lower bound of the allowed values: + EXPECT_FALSE(SetQueryOption(option.first, + lexical_cast<string>(RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE - 1), &options, + NULL) + .ok()); + + EXPECT_FALSE(SetQueryOption(option.first, + lexical_cast<string>(RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE + 1), &options, + NULL) + .ok()); + + EXPECT_OK(SetQueryOption(option.first, + lexical_cast<string>(RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE), &options, NULL)); + EXPECT_EQ(RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE, *option.second); + + EXPECT_OK(SetQueryOption(option.first, + lexical_cast<string>(RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE), &options, NULL)); + EXPECT_EQ(RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE, *option.second); + + // Parsing memory values works in a reasonable way: + EXPECT_OK(SetQueryOption(option.first, "1MB", &options, NULL)); + EXPECT_EQ(1 << 20, *option.second); + + // Bloom filters cannot occupy a percentage of memory: + EXPECT_FALSE(SetQueryOption(option.first, "10%", &options, NULL).ok()); + } } TEST(QueryOptions, SetFilterWait) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/service/query-options.cc ---------------------------------------------------------------------- diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc index ce538bf..4617256 100644 --- a/be/src/service/query-options.cc +++ b/be/src/service/query-options.cc @@ -325,17 +325,26 @@ Status impala::SetQueryOption(const string& key, const string& value, " OFF(0), LOCAL(1) or GLOBAL(2).", value)); } break; + case TImpalaQueryOptions::RUNTIME_FILTER_MAX_SIZE: + case TImpalaQueryOptions::RUNTIME_FILTER_MIN_SIZE: case TImpalaQueryOptions::RUNTIME_BLOOM_FILTER_SIZE: { int64_t size; RETURN_IF_ERROR(ParseMemValue(value, "Bloom filter size", &size)); if (size < RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE || size > RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE) { - return Status(Substitute( - "$0 is not a valid Bloom filter size. Valid sizes are in [$1, $2].", value, - RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE, - RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE)); + return Status(Substitute("$0 is not a valid Bloom filter size for $1. " + "Valid sizes are in [$2, $3].", value, PrintTImpalaQueryOptions( + static_cast<TImpalaQueryOptions::type>(option)), + RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE, + RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE)); + } + if (option == TImpalaQueryOptions::RUNTIME_BLOOM_FILTER_SIZE) { + query_options->__set_runtime_bloom_filter_size(size); + } else if (option == TImpalaQueryOptions::RUNTIME_FILTER_MIN_SIZE) { + query_options->__set_runtime_filter_min_size(size); + } else if (option == TImpalaQueryOptions::RUNTIME_FILTER_MAX_SIZE) { + query_options->__set_runtime_filter_max_size(size); } - query_options->__set_runtime_bloom_filter_size(size); break; } case TImpalaQueryOptions::RUNTIME_FILTER_WAIT_TIME_MS: { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/service/query-options.h ---------------------------------------------------------------------- diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h index 56e2e1a..6f4c214 100644 --- a/be/src/service/query-options.h +++ b/be/src/service/query-options.h @@ -32,7 +32,7 @@ class TQueryOptions; // the DCHECK. #define QUERY_OPTS_TABLE\ DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\ - TImpalaQueryOptions::S3_SKIP_INSERT_STAGING + 1);\ + TImpalaQueryOptions::RUNTIME_FILTER_MIN_SIZE + 1);\ QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\ QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR)\ QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\ @@ -77,7 +77,9 @@ class TQueryOptions; QUERY_OPT_FN(parquet_annotate_strings_utf8, PARQUET_ANNOTATE_STRINGS_UTF8)\ QUERY_OPT_FN(parquet_fallback_schema_resolution, PARQUET_FALLBACK_SCHEMA_RESOLUTION)\ QUERY_OPT_FN(mt_num_cores, MT_NUM_CORES)\ - QUERY_OPT_FN(s3_skip_insert_staging, S3_SKIP_INSERT_STAGING); + QUERY_OPT_FN(s3_skip_insert_staging, S3_SKIP_INSERT_STAGING)\ + QUERY_OPT_FN(runtime_filter_min_size, RUNTIME_FILTER_MIN_SIZE)\ + QUERY_OPT_FN(runtime_filter_max_size, RUNTIME_FILTER_MAX_SIZE); /// Converts a TQueryOptions struct into a map of key, value pairs. void TQueryOptionsToMap(const TQueryOptions& query_options, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/util/bit-util.h ---------------------------------------------------------------------- diff --git a/be/src/util/bit-util.h b/be/src/util/bit-util.h index e255f0c..eed6df3 100644 --- a/be/src/util/bit-util.h +++ b/be/src/util/bit-util.h @@ -51,11 +51,10 @@ class BitUtil { return (value / factor) * factor; } - /// Returns the smallest power of two that contains v. Taken from + /// Returns the smallest power of two that contains v. If v is a power of two, v is + /// returned. Taken from /// http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2 - /// TODO: Pick a better name, as it is not clear what happens when the input is - /// already a power of two. - static inline int64_t NextPowerOfTwo(int64_t v) { + static inline int64_t RoundUpToPowerOfTwo(int64_t v) { --v; v |= v >> 1; v |= v >> 2; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/util/debug-util.cc ---------------------------------------------------------------------- diff --git a/be/src/util/debug-util.cc b/be/src/util/debug-util.cc index 7e6a290..4cf606b 100644 --- a/be/src/util/debug-util.cc +++ b/be/src/util/debug-util.cc @@ -56,7 +56,8 @@ namespace impala { // Macro to stamp out operator<< for thrift enums. Why doesn't thrift do this? #define THRIFT_ENUM_OUTPUT_FN(E) THRIFT_ENUM_OUTPUT_FN_IMPL(E , _##E##_VALUES_TO_NAMES) -// Macro to implement Print function that returns string for thrift enums +// Macro to implement Print function that returns string for thrift enums. Make sure you +// define a corresponding THRIFT_ENUM_OUTPUT_FN. #define THRIFT_ENUM_PRINT_FN(E) \ string Print##E(const E::type& e) {\ stringstream ss;\ @@ -78,6 +79,7 @@ THRIFT_ENUM_OUTPUT_FN(CompressionCodec); THRIFT_ENUM_OUTPUT_FN(Type); THRIFT_ENUM_OUTPUT_FN(TMetricKind); THRIFT_ENUM_OUTPUT_FN(TUnit); +THRIFT_ENUM_OUTPUT_FN(TImpalaQueryOptions); THRIFT_ENUM_PRINT_FN(TCatalogObjectType); THRIFT_ENUM_PRINT_FN(TDdlType); @@ -88,6 +90,7 @@ THRIFT_ENUM_PRINT_FN(QueryState); THRIFT_ENUM_PRINT_FN(Encoding); THRIFT_ENUM_PRINT_FN(TMetricKind); THRIFT_ENUM_PRINT_FN(TUnit); +THRIFT_ENUM_PRINT_FN(TImpalaQueryOptions); ostream& operator<<(ostream& os, const TUniqueId& id) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/util/debug-util.h ---------------------------------------------------------------------- diff --git a/be/src/util/debug-util.h b/be/src/util/debug-util.h index 6872e66..c9550dc 100644 --- a/be/src/util/debug-util.h +++ b/be/src/util/debug-util.h @@ -69,6 +69,7 @@ std::string PrintEncoding(const parquet::Encoding::type& type); std::string PrintAsHex(const char* bytes, int64_t len); std::string PrintTMetricKind(const TMetricKind::type& type); std::string PrintTUnit(const TUnit::type& type); +std::string PrintTImpalaQueryOptions(const TImpalaQueryOptions::type& type); /// Returns the fully qualified path, e.g. "database.table.array_col.item.field" std::string PrintPath(const TableDescriptor& tbl_desc, const SchemaPath& path); /// Returns the numeric path without column/field names, e.g. "[0,1,2]" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/util/fixed-size-hash-table.h ---------------------------------------------------------------------- diff --git a/be/src/util/fixed-size-hash-table.h b/be/src/util/fixed-size-hash-table.h index 8ecb328..828769e 100644 --- a/be/src/util/fixed-size-hash-table.h +++ b/be/src/util/fixed-size-hash-table.h @@ -46,8 +46,8 @@ class FixedSizeHashTable { DCHECK_GT(min_capacity, 0); // Capacity cannot be greater than largest uint32_t power of two. capacity_ = static_cast<uint32_t>(std::min(static_cast<int64_t>(1) << 31, - BitUtil::NextPowerOfTwo(min_capacity))); - DCHECK_EQ(capacity_, BitUtil::NextPowerOfTwo(capacity_)); + BitUtil::RoundUpToPowerOfTwo(min_capacity))); + DCHECK_EQ(capacity_, BitUtil::RoundUpToPowerOfTwo(capacity_)); if (tbl_ != NULL) free(tbl_); int64_t tbl_byte_size = capacity_ * sizeof(Entry); tbl_ = reinterpret_cast<Entry*>(malloc(tbl_byte_size)); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/util/parse-util-test.cc ---------------------------------------------------------------------- diff --git a/be/src/util/parse-util-test.cc b/be/src/util/parse-util-test.cc index a6726fd..bd49371 100644 --- a/be/src/util/parse-util-test.cc +++ b/be/src/util/parse-util-test.cc @@ -31,7 +31,8 @@ TEST(ParseMemSpecs, Basic) { bool is_percent; int64_t bytes; - int64_t megabytes = 1024 * 1024; + int64_t kilobytes = 1024; + int64_t megabytes = 1024 * kilobytes; int64_t gigabytes = 1024 * megabytes; bytes = ParseUtil::ParseMemSpec("1", &is_percent, MemInfo::physical_mem()); @@ -42,6 +43,14 @@ TEST(ParseMemSpecs, Basic) { ASSERT_EQ(100, bytes); ASSERT_FALSE(is_percent); + bytes = ParseUtil::ParseMemSpec("100kb", &is_percent, MemInfo::physical_mem()); + ASSERT_EQ(100 * 1024, bytes); + ASSERT_FALSE(is_percent); + + bytes = ParseUtil::ParseMemSpec("5KB", &is_percent, MemInfo::physical_mem()); + ASSERT_EQ(5 * 1024, bytes); + ASSERT_FALSE(is_percent); + bytes = ParseUtil::ParseMemSpec("4MB", &is_percent, MemInfo::physical_mem()); ASSERT_EQ(4 * megabytes, bytes); ASSERT_FALSE(is_percent); @@ -77,6 +86,7 @@ TEST(ParseMemSpecs, Basic) { bad_values.push_back("gb"); bad_values.push_back("1GMb"); bad_values.push_back("1b1Mb"); + bad_values.push_back("1kib"); bad_values.push_back("1Bb"); bad_values.push_back("1%%"); bad_values.push_back("1.1"); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/util/parse-util.cc ---------------------------------------------------------------------- diff --git a/be/src/util/parse-util.cc b/be/src/util/parse-util.cc index 60cf3ac..1a0af85 100644 --- a/be/src/util/parse-util.cc +++ b/be/src/util/parse-util.cc @@ -49,6 +49,12 @@ int64_t ParseUtil::ParseMemSpec(const string& mem_spec_str, bool* is_percent, number_str_len--; multiplier = 1024L * 1024L; break; + case 'k': + case 'K': + // Kilobytes + number_str_len--; + multiplier = 1024L; + break; case '%': // Don't allow a suffix of "%B". if (suffix_char != mem_spec_str.rbegin()) return -1; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/util/parse-util.h ---------------------------------------------------------------------- diff --git a/be/src/util/parse-util.h b/be/src/util/parse-util.h index 5b13137..02ce120 100644 --- a/be/src/util/parse-util.h +++ b/be/src/util/parse-util.h @@ -27,6 +27,7 @@ class ParseUtil { /// Sets *is_percent to indicate whether the given spec is in percent. /// Accepted formats: /// '<int>[bB]?' -> bytes (default if no unit given) + /// '<float>[kK(bB)]' -> kilobytes /// '<float>[mM(bB)]' -> megabytes /// '<float>[gG(bB)]' -> in gigabytes /// '<int>%' -> in percent of relative_reference http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/common/thrift/ImpalaInternalService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift index 611155c..b9892e6 100644 --- a/common/thrift/ImpalaInternalService.thrift +++ b/common/thrift/ImpalaInternalService.thrift @@ -192,6 +192,12 @@ struct TQueryOptions { // those queries, the coordinator deletes all files in the final location before copying // the files there. 45: optional bool s3_skip_insert_staging = true + + // Minimum runtime filter size, in bytes + 46: optional i32 runtime_filter_min_size = 1048576 + + // Maximum runtime filter size, in bytes + 47: optional i32 runtime_filter_max_size = 16777216 } // Impala currently has two types of sessions: Beeswax and HiveServer2 http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/common/thrift/ImpalaService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift index 0a030ad..68647df 100644 --- a/common/thrift/ImpalaService.thrift +++ b/common/thrift/ImpalaService.thrift @@ -219,6 +219,12 @@ enum TImpalaQueryOptions { // the files there. // TODO: Find a way to get this working for INSERT OVERWRITEs too. S3_SKIP_INSERT_STAGING + + // Maximum runtime filter size, in bytes. + RUNTIME_FILTER_MAX_SIZE, + + // Minimum runtime filter size, in bytes. + RUNTIME_FILTER_MIN_SIZE } // The summary of an insert. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test index 4432810..2d32064 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test +++ b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test @@ -259,6 +259,7 @@ row_regex: .*RowsReturned: 2.43K .* SET RUNTIME_FILTER_WAIT_TIME_MS=15000; SET RUNTIME_FILTER_MODE=GLOBAL; +SET RUNTIME_FILTER_MAX_SIZE=4K; select STRAIGHT_JOIN count(*) from alltypes a join [BROADCAST] # Build-side needs to be sufficiently large to trigger FP check. @@ -335,6 +336,7 @@ select STRAIGHT_JOIN count(a.id) from alltypes a #################################################### SET RUNTIME_FILTER_MODE=GLOBAL; SET RUNTIME_FILTER_WAIT_TIME_MS=30000; +SET RUNTIME_FILTER_MIN_SIZE=4KB; with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem) select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a join (select * from l LIMIT 1) b on a.l_orderkey = -b.l_orderkey; @@ -347,6 +349,7 @@ row_regex: .*Filter 0 \(4.00 KB\).* ---- QUERY SET RUNTIME_FILTER_MODE=GLOBAL; SET RUNTIME_FILTER_WAIT_TIME_MS=30000; +SET RUNTIME_FILTER_MIN_SIZE=4KB; with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem) select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a join (select * from l LIMIT 500000) b on a.l_orderkey = -b.l_orderkey; @@ -359,6 +362,7 @@ row_regex: .*Filter 0 \(256.00 KB\).* ---- QUERY SET RUNTIME_FILTER_MODE=GLOBAL; SET RUNTIME_FILTER_WAIT_TIME_MS=30000; +SET RUNTIME_FILTER_MIN_SIZE=4KB; with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem) select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a join (select * from l LIMIT 1000000) b on a.l_orderkey = -b.l_orderkey; @@ -371,6 +375,7 @@ row_regex: .*Filter 0 \(512.00 KB\).* ---- QUERY SET RUNTIME_FILTER_MODE=GLOBAL; SET RUNTIME_FILTER_WAIT_TIME_MS=30000; +SET RUNTIME_FILTER_MIN_SIZE=4KB; with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem) select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a join (select * from l LIMIT 2000000) b on a.l_orderkey = -b.l_orderkey; @@ -380,3 +385,44 @@ select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a row_regex: .*1 of 1 Runtime Filter Published.* row_regex: .*Filter 0 \(1.00 MB\).* ==== + + +---- QUERY +#################################################### +# Test case 16: Filter sizes respect query options +#################################################### +SET RUNTIME_FILTER_MODE=GLOBAL; +SET RUNTIME_FILTER_WAIT_TIME_MS=30000; +SET RUNTIME_FILTER_MIN_SIZE=8KB; +SET RUNTIME_FILTER_MAX_SIZE=8KB; +# This query would produce a 4KB filter without setting the minimum size. +select STRAIGHT_JOIN count(*) from alltypes a join [SHUFFLE] alltypes b on a.id = b.id; +---- RESULTS +7300 +---- RUNTIME_PROFILE +row_regex: .*1 of 1 Runtime Filter Published.* +row_regex: .*Filter 0 \(8.00 KB\).* +==== +---- QUERY +# Check that filter sizes are rounded up to power-of-two +SET RUNTIME_FILTER_MIN_SIZE=6000B; +SET RUNTIME_FILTER_MAX_SIZE=6000B; +select STRAIGHT_JOIN count(*) from alltypes a join [SHUFFLE] alltypes b on a.id = b.id; +---- RESULTS +7300 +---- RUNTIME_PROFILE +row_regex: .*1 of 1 Runtime Filter Published.* +row_regex: .*Filter 0 \(8.00 KB\).* +==== +---- QUERY +SET RUNTIME_FILTER_MODE=GLOBAL; +SET RUNTIME_FILTER_WAIT_TIME_MS=30000; +SET RUNTIME_FILTER_MAX_SIZE=8192; +# Query would produce a 512KB filter without setting the max +with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem) +select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a + join (select * from l LIMIT 1000000) b on a.l_orderkey = -b.l_orderkey; +---- RUNTIME_PROFILE +row_regex: .*0 of 1 Runtime Filter Published.* +row_regex: .*Filter 0 \(8.00 KB\).* +==== http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_wait.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_wait.test b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_wait.test index 324eb1c..4743f3e 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_wait.test +++ b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_wait.test @@ -24,6 +24,7 @@ on p.month = b.int_col and b.month = 1 and b.string_col = "1" SET RUNTIME_FILTER_WAIT_TIME_MS=600000; SET RUNTIME_FILTER_MODE=GLOBAL; +SET RUNTIME_FILTER_MAX_SIZE=4096; select STRAIGHT_JOIN count(*) from alltypes a join [BROADCAST] # Build-side needs to be sufficiently large to trigger FP check.
