This is an automated email from the ASF dual-hosted git repository.
csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 6abfdbc56 IMPALA-12980: Translate CpuAsk into admission control slots
6abfdbc56 is described below
commit 6abfdbc56c3d0ec3ac201dd4b8c2c35656d24eaf
Author: Riza Suminto <[email protected]>
AuthorDate: Tue Mar 26 18:51:52 2024 -0700
IMPALA-12980: Translate CpuAsk into admission control slots
Impala has a concept of "admission control slots" - the amount of
parallelism that should be allowed on an Impala daemon. This defaults to
the number of processors per executor and can be overridden with
-–admission_control_slots flag.
Admission control slot accounting is described in IMPALA-8998. It
computes 'slots_to_use' for each backend based on the maximum number of
instances of any fragment on that backend. This can lead to slot
underestimation and query overadmission. For example, assume an executor
node with 48 CPU cores and configured with -–admission_control_slots=48.
It is assigned 4 non-blocking query fragments, each has 12 instances
scheduled in this executor. IMPALA-8998 algorithm will request the max
instance (12) slots rather than the sum of all non-blocking fragment
instances (48). With the 36 remaining slots free, the executor can still
admit another fragment from a different query but will potentially have
CPU contention with the one that is currently running.
When COMPUTE_PROCESSING_COST is enabled, Planner will generate a CpuAsk
number that represents the cpu requirement of that query over a
particular executor group set. This number is an estimation of the
largest number of query fragment instances that can run in parallel
without waiting, given by the blocking operator analysis. Therefore, the
fragment trace that sums into that CpuAsk number can be translated into
'slots_to_use' as well, which will be a closer resemblance of maximum
parallel execution of fragment instances.
This patch adds a new query option called SLOT_COUNT_STRATEGY to control
which admission control slot accounting to use. There are two possible
values:
- LARGEST_FRAGMENT, which is the original algorithm from IMPALA-8998.
This is still the default value for the SLOT_COUNT_STRATEGY option.
- PLANNER_CPU_ASK, which will follow the fragment trace that contributes
towards CpuAsk number. This strategy will schedule more or equal
admission control slots than the LARGEST_FRAGMENT strategy.
To do the PLANNER_CPU_ASK strategy, the Planner will mark fragments that
contribute to CpuAsk as dominant fragments. It also passes
max_slot_per_executor information that it knows about the executor group
set to the scheduler.
AvgAdmissionSlotsPerExecutor counter is added to describe what Planner
thinks the average 'slots_to_use' per backend will be, which follows
this formula:
AvgAdmissionSlotsPerExecutor = ceil(CpuAsk / num_executors)
Actual 'slots_to_use' in each backend may differ than
AvgAdmissionSlotsPerExecutor, depending on what is scheduled on that
backend. 'slots_to_use' will be shown as 'AdmissionSlots' counter under
each executor profile node.
Testing:
- Update test_executors.py with AvgAdmissionSlotsPerExecutor assertion.
- Pass test_tpcds_queries.py::TestTpcdsQueryWithProcessingCost.
- Add EE test test_processing_cost.py.
- Add FE test PlannerTest#testProcessingCostPlanAdmissionSlots.
Change-Id: I338ca96555bfe8d07afce0320b3688a0861663f2
Reviewed-on: http://gerrit.cloudera.org:8080/21257
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/scheduling/admission-controller-test.cc | 3 +-
be/src/scheduling/admission-controller.cc | 4 +-
be/src/scheduling/scheduler.cc | 53 +-
be/src/service/query-options.cc | 7 +
be/src/service/query-options.h | 3 +-
common/thrift/ImpalaService.thrift | 5 +
common/thrift/Planner.thrift | 5 +
common/thrift/Query.thrift | 33 +-
fe/src/main/java/org/apache/impala/common/Id.java | 1 -
.../java/org/apache/impala/planner/CoreCount.java | 77 +-
.../org/apache/impala/planner/CostingSegment.java | 12 +-
.../org/apache/impala/planner/PlanFragment.java | 7 +
.../java/org/apache/impala/planner/Planner.java | 24 +-
.../java/org/apache/impala/service/Frontend.java | 23 +
.../org/apache/impala/planner/PlannerTest.java | 11 +
.../org/apache/impala/planner/PlannerTestBase.java | 2 +
.../processing-cost-plan-admission-slots.test | 976 +++++++++++++++++++++
.../QueryTest/processing-cost-admission-slots.test | 110 +++
tests/custom_cluster/test_executor_groups.py | 208 +++--
tests/query_test/test_processing_cost.py | 42 +
tests/query_test/test_tpcds_queries.py | 8 +-
21 files changed, 1504 insertions(+), 110 deletions(-)
diff --git a/be/src/scheduling/admission-controller-test.cc
b/be/src/scheduling/admission-controller-test.cc
index 9111a5367..babc848fe 100644
--- a/be/src/scheduling/admission-controller-test.cc
+++ b/be/src/scheduling/admission-controller-test.cc
@@ -637,7 +637,8 @@ TEST_F(AdmissionControllerTest, QueryRejection) {
EXPECT_STR_CONTAINS(rejected_slots_reason,
"number of admission control slots needed "
"(16) on backend 'host1:25000' is greater than total slots available 4.
Reduce "
- "mt_dop to less than 4 to ensure that the query can execute.");
+ "MT_DOP or MAX_FRAGMENT_INSTANCES_PER_NODE to less than 4 to ensure that
the "
+ "query can execute.");
rejected_slots_reason = "";
// Reduce mt_dop to ensure it can execute.
SetHostsInScheduleState(
diff --git a/be/src/scheduling/admission-controller.cc
b/be/src/scheduling/admission-controller.cc
index 6c614fc8e..af14a7010 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -212,8 +212,8 @@ const string REASON_BUFFER_LIMIT_TOO_LOW_FOR_RESERVATION =
"profile for more information about the per-node memory requirements.";
const string REASON_NOT_ENOUGH_SLOTS_ON_BACKEND =
"number of admission control slots needed ($0) on backend '$1' is greater
than total "
- "slots available $2. Reduce mt_dop to less than $2 to ensure that the
query can "
- "execute.";
+ "slots available $2. Reduce MT_DOP or MAX_FRAGMENT_INSTANCES_PER_NODE to
less than "
+ "$2 to ensure that the query can execute.";
const string REASON_MIN_RESERVATION_OVER_POOL_MEM =
"minimum memory reservation needed is greater than pool max mem resources.
Pool "
"max mem resources: $0. Cluster-wide memory reservation needed: $1.
Increase the "
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 1809dce55..11ed058b7 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -1165,23 +1165,64 @@ void Scheduler::ComputeBackendExecParams(
}
// Compute 'slots_to_use' for each backend based on the max # of instances of
- // any fragment on that backend.
+ // any fragment on that backend. If 'compute_processing_cost' is on, and
Planner
+ // set 'max_slot_per_executor', pick the min between
'dominant_instance_count' and
+ // 'max_slot_per_executor'.
+ bool cap_slots = state->query_options().compute_processing_cost
+ && state->query_options().slot_count_strategy ==
TSlotCountStrategy::PLANNER_CPU_ASK
+ && state->request().__isset.max_slot_per_executor
+ && state->request().max_slot_per_executor > 0;
for (auto& backend : state->per_backend_schedule_states()) {
- int be_max_instances = 0;
+ int be_max_instances = 0; // max # of instances of any fragment.
+ int dominant_instance_count = 0; // sum of all dominant fragment instances.
+
// Instances for a fragment are clustered together because of how the
vector is
- // constructed above. So we can compute the max # of instances of any
fragment
- // with a single pass over the vector.
+ // constructed above. For example, 3 fragments with 2 instances each will
be in this
+ // order inside exec_params->instance_params() vector.
+ // [F00_a, F00_b, F01_c, F01_d, F02_e, F02_f]
+ // So we can compute the max # of instances of any fragment with a single
pass over
+ // the vector.
int curr_fragment_idx = -1;
int curr_instance_count = 0; // Number of instances of the current
fragment seen.
+ bool is_dominant = false;
for (auto& finstance : backend.second.exec_params->instance_params()) {
if (curr_fragment_idx == -1 || curr_fragment_idx !=
finstance.fragment_idx()) {
+ // We arrived at new fragment group. Update 'be_max_instances'.
+ be_max_instances = max(be_max_instances, curr_instance_count);
+ // Reset 'curr_fragment_idx' and other counting related variables.
curr_fragment_idx = finstance.fragment_idx();
curr_instance_count = 0;
+ is_dominant =
+
state->GetFragmentScheduleState(curr_fragment_idx)->fragment.is_dominant;
}
++curr_instance_count;
- be_max_instances = max(be_max_instances, curr_instance_count);
+ if (is_dominant) ++dominant_instance_count;
+ }
+ // Update 'be_max_instances' one last time.
+ be_max_instances = max(be_max_instances, curr_instance_count);
+
+ // Default slots to use number from IMPALA-8998.
+ // For fragment with largest num of instances running in this backend, it
+ // ensures allocation of 1 slot for each instance of that fragment.
+ int slots_to_use = be_max_instances;
+
+ // Done looping exec_params->instance_params(). Derived 'slots_to_use'
based on
+ // finalized 'be_max_instances' and 'dominant_instance_count'.
+ if (cap_slots) {
+ if (dominant_instance_count >= be_max_instances) {
+ // One case where it is possible to have 'dominant_instance_count' <
+ // 'be_max_instances' is with dedicated coordinator setup. The
schedule would
+ // only assign one coordinator fragment instance to the coordinator
node,
+ // but 'dominant_instance_count' can be 0 if fragment.is_dominant ==
false.
+ // In that case, ignore 'dominant_instance_count' and continue with
+ // 'be_max_instances'.
+ // However, if 'dominant_instance_count' >= 'be_max_instances',
+ // continue with 'dominant_instance_count'.
+ slots_to_use = dominant_instance_count;
+ }
+ slots_to_use = min(slots_to_use, state->request().max_slot_per_executor);
}
- backend.second.exec_params->set_slots_to_use(be_max_instances);
+ backend.second.exec_params->set_slots_to_use(slots_to_use);
}
// This also ensures an entry always exists for the coordinator backend.
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index d04aab5dc..10b4f3fbc 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -1278,6 +1278,13 @@ Status impala::SetQueryOption(const string& key, const
string& value,
query_options->__set_runtime_filter_ids_to_skip(filter_ids);
break;
}
+ case TImpalaQueryOptions::SLOT_COUNT_STRATEGY: {
+ TSlotCountStrategy::type enum_type;
+ RETURN_IF_ERROR(GetThriftEnum(value, "Slot count strategy",
+ _TSlotCountStrategy_VALUES_TO_NAMES, &enum_type));
+ query_options->__set_slot_count_strategy(enum_type);
+ break;
+ }
default:
if (IsRemovedQueryOption(key)) {
LOG(WARNING) << "Ignoring attempt to set removed query option '" <<
key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 0738a10b7..127f1ddb2 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -52,7 +52,7 @@ typedef std::unordered_map<string,
beeswax::TQueryOptionLevel::type>
// time we add or remove a query option to/from the enum TImpalaQueryOptions.
#define QUERY_OPTS_TABLE
\
DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),
\
- TImpalaQueryOptions::RUNTIME_FILTER_IDS_TO_SKIP + 1);
\
+ TImpalaQueryOptions::SLOT_COUNT_STRATEGY + 1);
\
REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded,
ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \
QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)
\
REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)
\
@@ -330,6 +330,7 @@ typedef std::unordered_map<string,
beeswax::TQueryOptionLevel::type>
ICEBERG_DISABLE_COUNT_STAR_OPTIMIZATION, TQueryOptionLevel::ADVANCED)
\
QUERY_OPT_FN(runtime_filter_ids_to_skip,
\
RUNTIME_FILTER_IDS_TO_SKIP, TQueryOptionLevel::DEVELOPMENT)
\
+ QUERY_OPT_FN(slot_count_strategy, SLOT_COUNT_STRATEGY,
TQueryOptionLevel::ADVANCED) \
;
/// Enforce practical limits on some query options to avoid undesired query
state.
diff --git a/common/thrift/ImpalaService.thrift
b/common/thrift/ImpalaService.thrift
index 61f5dbd0a..d1a5ebad0 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -930,6 +930,11 @@ enum TImpalaQueryOptions {
// RUNTIME_FILTER_IDS_TO_SKIP="1,2,3"
// If using impala-shell client, double quote is not required.
RUNTIME_FILTER_IDS_TO_SKIP = 176
+
+ // Decide what strategy to use to compute number of slot per node to run a
query.
+ // Default to number of instances of largest query fragment
(LARGEST_FRAGMENT).
+ // See TSlotCountStrategy in Query.thrift for documentation of its possible
values.
+ SLOT_COUNT_STRATEGY = 177
}
// The summary of a DML statement.
diff --git a/common/thrift/Planner.thrift b/common/thrift/Planner.thrift
index 01249f8f2..36622f97f 100644
--- a/common/thrift/Planner.thrift
+++ b/common/thrift/Planner.thrift
@@ -94,6 +94,11 @@ struct TPlanFragment {
// If true, the fragment must be scheduled on the coordinator. In this case
'partition'
// must be UNPARTITIONED.
15: required bool is_coordinator_only
+
+ // Marker on whether this is a dominant fragment or not. Only possible to be
true if
+ // COMPUTE_PROCESSING_COST=true. Otherwise, always false.
+ // See PlanFragment.java for definition of dominant fragment.
+ 16: optional bool is_dominant = false
}
// location information for a single scan range
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index 01ff6ebbc..e0ab68ddb 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -117,6 +117,24 @@ enum TCodeGenOptLevel {
O3
}
+// Option to decide how to compute slots_to_use for a query.
+// See Scheduler::ComputeBackendExecParams.
+enum TSlotCountStrategy {
+ // Compute slots to use for each backend based on the max number of
instances of any
+ // fragment on that backend. This is the default and only strategy available
if
+ // COMPUTE_PROCESSING_COST option is disabled. See IMPALA-8998.
+ LARGEST_FRAGMENT = 0,
+
+ // Compute slots to use for each backend based on CpuAsk counter from
Planner.
+ // The CpuAsk is the largest sum of fragments instances subset that can run
in-parallel
+ // without waiting for each other. This strategy relies on blocking operator
analysis
+ // that is only available if COMPUTE_PROCESSING_COST option is enabled, and
will
+ // schedule more or equal admission control slots than the LARGEST_FRAGMENT
strategy.
+ // The scheduler will silently ignore this choice and fallback to
LARGEST_FRAGMENT if
+ // COMPUTE_PROCESSING_COST is disabled.
+ PLANNER_CPU_ASK = 1
+}
+
// constants for TQueryOptions.num_nodes
const i32 NUM_NODES_ALL = 0
const i32 NUM_NODES_ALL_RACKS = -1
@@ -705,6 +723,10 @@ struct TQueryOptions {
// See comment in ImpalaService.thrift
177: optional set<i32> runtime_filter_ids_to_skip
+
+ // See comment in ImpalaService.thrift
+ 178: optional TSlotCountStrategy slot_count_strategy =
+ TSlotCountStrategy.LARGEST_FRAGMENT
}
// Impala currently has three types of sessions: Beeswax, HiveServer2 and
external
@@ -997,10 +1019,9 @@ struct TQueryExecRequest {
// Indicate whether the request is a trivial query. Used by admission
control.
13: optional bool is_trivial_query
- // CPU core count required to run the query. Used by admission control to
decide which
- // executor group to run the query. Non-positive value means no specific CPU
core count
- // is required.
- 14: optional i32 cores_required;
+ // CPU core count required to run the query. Used by Frontend to decide which
+ // executor group to run the query. Should either unset or set with positive
value.
+ 14: optional i32 cores_required
// Estimated per-host memory. The planner generates this value which may or
may not be
// overridden to come up with a final per-host memory estimate.
@@ -1008,5 +1029,9 @@ struct TQueryExecRequest {
// Used for system tables that need to run on all nodes.
16: optional bool include_all_coordinators
+
+ // Maximum admission control slot to use per executor backend.
+ // Only set if COMPUTE_PROCESSING_COST option is True.
+ 17: optional i32 max_slot_per_executor
}
diff --git a/fe/src/main/java/org/apache/impala/common/Id.java
b/fe/src/main/java/org/apache/impala/common/Id.java
index a5ce52e20..6c15a76a5 100644
--- a/fe/src/main/java/org/apache/impala/common/Id.java
+++ b/fe/src/main/java/org/apache/impala/common/Id.java
@@ -19,7 +19,6 @@ package org.apache.impala.common;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashSet;
import java.util.List;
import java.util.Set;
diff --git a/fe/src/main/java/org/apache/impala/planner/CoreCount.java
b/fe/src/main/java/org/apache/impala/planner/CoreCount.java
index e8fc87101..721ee3b03 100644
--- a/fe/src/main/java/org/apache/impala/planner/CoreCount.java
+++ b/fe/src/main/java/org/apache/impala/planner/CoreCount.java
@@ -19,10 +19,10 @@ package org.apache.impala.planner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import org.apache.impala.common.Id;
-import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -39,28 +39,59 @@ public class CoreCount {
// List of CPU core count contributing to this CoreCount.
private final ImmutableList<Integer> counts_;
+ // Set of unique fragment that contributes toward this CoreCount.
+ private final ImmutableSet<PlanFragmentId> uniqueFragmentIds_;
+
+ // True if this CoreCount include a plan root sink.
+ private final boolean hasPlanRootSink_;
+
// Sum of all elements in count_.
- // Cached after the first call of total().
- private int total_ = -1;
+ private final int total_;
+
+ public CoreCount(PlanFragment fragment, int count) {
+ Preconditions.checkArgument(count >= 0, "Core count must be a non-negative
number");
+ ids_ = ImmutableList.of(fragment.getId());
+ counts_ = ImmutableList.of(count);
+ uniqueFragmentIds_ = ImmutableSet.of(fragment.getId());
+ hasPlanRootSink_ = (fragment.getSink() instanceof PlanRootSink);
+ total_ = counts_.stream().mapToInt(v -> v).sum();
+ }
- public CoreCount(Id id, int count) {
+ public CoreCount(PlanNode node, int count) {
Preconditions.checkArgument(count >= 0, "Core count must be a non-negative
number");
- ids_ = ImmutableList.of(id);
+ ids_ = ImmutableList.of(node.getId());
counts_ = ImmutableList.of(count);
+ PlanFragment fragment = node.getFragment();
+ uniqueFragmentIds_ = ImmutableSet.of(fragment.getId());
+ hasPlanRootSink_ = false;
+ total_ = counts_.stream().mapToInt(v -> v).sum();
}
- private CoreCount(ImmutableList<Id> ids, ImmutableList<Integer> counts) {
+ private CoreCount(ImmutableList<Id> ids, ImmutableList<Integer> counts,
+ ImmutableSet<PlanFragmentId> uniqueFragments, boolean hasPlanRootSink) {
Preconditions.checkArgument(
ids.size() == counts.size(), "ids and counts must have same size!");
ids_ = ids;
counts_ = counts;
+ uniqueFragmentIds_ = uniqueFragments;
+ hasPlanRootSink_ = hasPlanRootSink;
+ total_ = counts_.stream().mapToInt(v -> v).sum();
}
- public int total() {
- if (total_ < 0) {
- total_ = counts_.stream().mapToInt(v -> v).sum();
- }
- return total_;
+ public int total() { return total_; }
+ public boolean hasCoordinator() { return hasPlanRootSink_; }
+
+ /**
+ * If this CoreCount has coordinator fragment in it, return total() - 1.
+ * Otherwise, return the same value as total().
+ */
+ public int totalWithoutCoordinator() { return total_ - (hasPlanRootSink_ ? 1
: 0); }
+
+ /**
+ * Return a set of PlanFragmentId that contribute toward this CoreCount.
+ */
+ public ImmutableSet<PlanFragmentId> getUniqueFragmentIds() {
+ return uniqueFragmentIds_;
}
@Override
@@ -85,26 +116,28 @@ public class CoreCount {
protected static CoreCount sum(List<CoreCount> cores) {
ImmutableList.Builder<Id> idBuilder = new ImmutableList.Builder<Id>();
ImmutableList.Builder<Integer> countBuilder = new
ImmutableList.Builder<Integer>();
+ ImmutableSet.Builder<PlanFragmentId> fragmentIdBuilder =
+ new ImmutableSet.Builder<PlanFragmentId>();
+ boolean hasPlanRootSink = false;
for (CoreCount coreRequirement : cores) {
idBuilder.addAll(coreRequirement.ids_);
countBuilder.addAll(coreRequirement.counts_);
+ fragmentIdBuilder.addAll(coreRequirement.uniqueFragmentIds_);
+ hasPlanRootSink |= coreRequirement.hasPlanRootSink_;
}
- return new CoreCount(idBuilder.build(), countBuilder.build());
+ return new CoreCount(idBuilder.build(), countBuilder.build(),
+ fragmentIdBuilder.build(), hasPlanRootSink);
}
protected static CoreCount sum(CoreCount core1, CoreCount core2) {
- ImmutableList.Builder<Id> idBuilder = new ImmutableList.Builder<Id>();
- ImmutableList.Builder<Integer> countBuilder = new
ImmutableList.Builder<Integer>();
-
- idBuilder.addAll(core1.ids_);
- idBuilder.addAll(core2.ids_);
- countBuilder.addAll(core1.counts_);
- countBuilder.addAll(core2.counts_);
-
- return new CoreCount(idBuilder.build(), countBuilder.build());
+ return sum(ImmutableList.of(core1, core2));
}
protected static CoreCount max(CoreCount core1, CoreCount core2) {
- return (core1.total() < core2.total()) ? core2 : core1;
+ if (core1.totalWithoutCoordinator() < core2.totalWithoutCoordinator()) {
+ return core2;
+ } else {
+ return core1;
+ }
}
}
diff --git a/fe/src/main/java/org/apache/impala/planner/CostingSegment.java
b/fe/src/main/java/org/apache/impala/planner/CostingSegment.java
index fed37bf29..49c87cfb6 100644
--- a/fe/src/main/java/org/apache/impala/planner/CostingSegment.java
+++ b/fe/src/main/java/org/apache/impala/planner/CostingSegment.java
@@ -77,6 +77,16 @@ public class CostingSegment extends TreeNode<CostingSegment>
{
}
}
+ private CoreCount createCoreCount() {
+ if (isOutputSegment()) {
+ return new CoreCount(sink_.getFragment(),
cost_.getNumInstancesExpected());
+ } else {
+ Preconditions.checkState(!nodes_.isEmpty());
+ PlanNode topNode = nodes_.get(nodes_.size() - 1);
+ return new CoreCount(topNode, cost_.getNumInstancesExpected());
+ }
+ }
+
private void appendCost(ProcessingCost additionalCost) {
Preconditions.checkArgument(additionalCost.isValid());
ProcessingCost newTotalCost = ProcessingCost.sumCost(additionalCost,
cost_);
@@ -106,7 +116,7 @@ public class CostingSegment extends
TreeNode<CostingSegment> {
protected CoreCount traverseBlockingAwareCores(
Map<PlanFragmentId, Pair<CoreCount, List<CoreCount>>> fragmentCoreState,
ImmutableList.Builder<CoreCount> subtreeCoreBuilder) {
- CoreCount segmentCore = new CoreCount(getRootId(),
cost_.getNumInstancesExpected());
+ CoreCount segmentCore = createCoreCount();
// If not in input segment, gather cost of children first.
for (CostingSegment childSegment : getChildren()) {
CoreCount childSegmentCores =
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
index e59df00cd..07ad46d1a 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
@@ -173,6 +173,11 @@ public class PlanFragment extends TreeNode<PlanFragment> {
private int thisTreeCpuCore_ = -1;
private int subtreeCpuCore_ = -1;
+ // Determine whether this fragment is the dominant one in the plan tree
based on
+ // calculation initiated by Planner.computeBlockingAwareCores().
+ // A fragment is dominant if it contribute towards the final CoreCount.
+ private boolean isDominantFragment_ = false;
+
public long getProducedRuntimeFiltersMemReservationBytes() {
return producedRuntimeFiltersMemReservationBytes_;
}
@@ -636,6 +641,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
result.setThread_reservation(perInstanceResourceProfile_.getThreadReservation());
result.setEffective_instance_count(getAdjustedInstanceCount());
result.setIs_coordinator_only(coordinatorOnly_);
+ result.setIs_dominant(isDominantFragment_);
return result;
}
@@ -826,6 +832,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
planRoot_ = root;
setFragmentInPlanTree(planRoot_);
}
+ protected void markDominant() { isDominantFragment_ = true; }
/**
* Set the destination node of this fragment's sink, i.e. an ExchangeNode or
a JoinNode.
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java
b/fe/src/main/java/org/apache/impala/planner/Planner.java
index 0bd71a40a..e2e45397b 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -19,8 +19,10 @@ package org.apache.impala.planner;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.Stack;
import java.util.stream.Collectors;
@@ -54,7 +56,6 @@ import org.apache.impala.thrift.TQueryCtx;
import org.apache.impala.thrift.TQueryExecRequest;
import org.apache.impala.thrift.TQueryOptions;
import org.apache.impala.thrift.TRuntimeFilterMode;
-import org.apache.impala.thrift.TSortingOrder;
import org.apache.impala.thrift.TTableName;
import org.apache.impala.util.EventSequence;
import org.apache.impala.util.KuduUtil;
@@ -566,10 +567,25 @@ public class Planner {
computeEffectiveParallelism(postOrderFragments,
rootAnalyzer.getMinParallelismPerNode(),
rootAnalyzer.getMaxParallelismPerNode());
- CoreCount effectiveCores = computeBlockingAwareCores(postOrderFragments);
- request.setCores_required(effectiveCores.total());
- LOG.info("CoreCount=" + effectiveCores);
+ // Count bounded core count. This is taken from final instance count from
previous
+ // step.
+ CoreCount boundedCores = computeBlockingAwareCores(postOrderFragments);
+ Set<PlanFragmentId> dominantFragmentIds =
+ new HashSet<>(boundedCores.getUniqueFragmentIds());
+ int coresRequired = Math.max(1, boundedCores.totalWithoutCoordinator());
+ if (boundedCores.hasCoordinator()) {
+ // exclude coordinator fragment from dominantFragmentIds.
+ dominantFragmentIds.remove(rootFragment.getId());
+ }
+ request.setCores_required(coresRequired);
+ LOG.info("CoreCount=" + boundedCores + ", coresRequired=" + coresRequired);
+
+ // Mark dominant fragment. This will be used by scheduler in scheduler.cc
to count
+ // admission slot requirement.
+ for (PlanFragment fragment : postOrderFragments) {
+ if (dominantFragmentIds.contains(fragment.getId()))
fragment.markDominant();
+ }
}
/**
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java
b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 7bb3d0efe..815ef6337 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -204,6 +204,7 @@ import org.apache.impala.thrift.TResultSetMetadata;
import org.apache.impala.thrift.TRuntimeProfileNode;
import org.apache.impala.thrift.TShowFilesParams;
import org.apache.impala.thrift.TShowStatsOp;
+import org.apache.impala.thrift.TSlotCountStrategy;
import org.apache.impala.thrift.TStmtType;
import org.apache.impala.thrift.TTableName;
import org.apache.impala.thrift.TTruncateParams;
@@ -266,6 +267,8 @@ public class Frontend {
private static final String MEMORY_ASK = "MemoryAsk";
private static final String CPU_MAX = "CpuMax";
private static final String CPU_ASK = "CpuAsk";
+ private static final String AVG_ADMISSION_SLOTS_PER_EXECUTOR =
+ "AvgAdmissionSlotsPerExecutor";
/**
* Plan-time context that allows capturing various artifacts created
@@ -2329,6 +2332,17 @@ public class Frontend {
if (matchFound) {
setGroupNamePrefix(default_executor_group, clientSetRequestPool, req,
group_set);
+ if (isComputeCost && req.query_exec_request != null
+ && queryOptions.slot_count_strategy ==
TSlotCountStrategy.PLANNER_CPU_ASK) {
+ // Use 'cores_requirement' instead of 'scaled_cores_requirement'
since the
+ // former is derived from the real number of fragment instances.
+ int avgSlotsUsePerBackend =
+ getAvgSlotsUsePerBackend(req, cores_requirement, group_set);
+ FrontendProfile.getCurrent().setToCounter(
+ AVG_ADMISSION_SLOTS_PER_EXECUTOR, TUnit.UNIT,
avgSlotsUsePerBackend);
+ req.query_exec_request.setMax_slot_per_executor(
+ group_set.getNum_cores_per_executor());
+ }
break;
}
@@ -2379,6 +2393,15 @@ public class Frontend {
return req;
}
+ private static int getAvgSlotsUsePerBackend(
+ TExecRequest req, int cores_requirement, TExecutorGroupSet group_set) {
+ int numExecutors = expectedNumExecutor(group_set);
+ Preconditions.checkState(cores_requirement > 0);
+ Preconditions.checkState(numExecutors > 0);
+ int idealSlot = (int) Math.ceil((double) cores_requirement / numExecutors);
+ return Math.max(1, Math.min(idealSlot,
group_set.getNum_cores_per_executor()));
+ }
+
private static void setGroupNamePrefix(boolean default_executor_group,
boolean clientSetRequestPool, TExecRequest req, TExecutorGroupSet
group_set) {
// Set the group name prefix in both the returned query options and
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 1553be896..e3a09c236 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -1447,6 +1447,17 @@ public class PlannerTest extends PlannerTestBase {
tpcdsParquetTestOptions());
}
+ /**
+ * Test that shows query plan for the same test cases at EE test
+ * test_processing_cost.py::TestProcessingCost::test_admission_slots.
+ */
+ @Test
+ public void testProcessingCostPlanAdmissionSlots() {
+ TQueryOptions options = tpcdsParquetQueryOptions();
+ runPlannerTestFile("processing-cost-plan-admission-slots",
+ "tpcds_partitioned_parquet_snap", options, tpcdsParquetTestOptions());
+ }
+
/**
* Test SELECTIVITY hints
*/
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
index eee094dea..bf0750f66 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
@@ -70,6 +70,7 @@ import org.apache.impala.thrift.TQueryOptions;
import org.apache.impala.thrift.TReplicaPreference;
import org.apache.impala.thrift.TScanRangeLocationList;
import org.apache.impala.thrift.TScanRangeSpec;
+import org.apache.impala.thrift.TSlotCountStrategy;
import org.apache.impala.thrift.TTableDescriptor;
import org.apache.impala.thrift.TTableSink;
import org.apache.impala.thrift.TTupleDescriptor;
@@ -426,6 +427,7 @@ public class PlannerTestBase extends FrontendTestBase {
.setCompute_processing_cost(true)
.setMax_fragment_instances_per_node(12)
.setReplica_preference(TReplicaPreference.REMOTE)
+ .setSlot_count_strategy(TSlotCountStrategy.PLANNER_CPU_ASK)
.setPlanner_testcase_mode(true);
}
diff --git
a/testdata/workloads/functional-planner/queries/PlannerTest/processing-cost-plan-admission-slots.test
b/testdata/workloads/functional-planner/queries/PlannerTest/processing-cost-plan-admission-slots.test
new file mode 100644
index 000000000..7d30603f8
--- /dev/null
+++
b/testdata/workloads/functional-planner/queries/PlannerTest/processing-cost-plan-admission-slots.test
@@ -0,0 +1,976 @@
+# This FE tests should match test cases in
+# functional-query/queries/QueryTest/processing-cost-admission-slots.test
+# Any modification here should be applied there as well.
+#
+# QUERY: TPCDS-Q1-CPC-PLANNER-CPU-ASK
+# Expect a total of 16 admission slots given to this query if using
PLANNER_CPU_ASK strategy.
+with customer_total_return as (
+ select sr_customer_sk as ctr_customer_sk,
+ sr_store_sk as ctr_store_sk,
+ sum(SR_RETURN_AMT) as ctr_total_return
+ from tpcds_partitioned_parquet_snap.store_returns,
+ tpcds_partitioned_parquet_snap.date_dim
+ where sr_returned_date_sk = d_date_sk
+ and d_year = 2000
+ group by sr_customer_sk, sr_store_sk
+) select c_customer_id
+from customer_total_return ctr1,
+ tpcds_partitioned_parquet_snap.store,
+ tpcds_partitioned_parquet_snap.customer
+where ctr1.ctr_total_return > (
+ select avg(ctr_total_return) * 1.2
+ from customer_total_return ctr2
+ where ctr1.ctr_store_sk = ctr2.ctr_store_sk)
+and s_store_sk = ctr1.ctr_store_sk
+and s_state = 'TN'
+and ctr1.ctr_customer_sk = c_customer_sk
+order by c_customer_id
+limit 100;
+---- QUERYOPTIONS
+COMPUTE_PROCESSING_COST=True
+SLOT_COUNT_STRATEGY=PLANNER_CPU_ASK
+---- PARALLELPLANS
+Max Per-Host Resource Reservation: Memory=42.58MB Threads=16
+Per-Host Resource Estimates: Memory=189MB
+F10:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+| Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB
thread-reservation=1
+| max-parallelism=1 segment-costs=[104] cpu-comparison-result=16 [max(1
(self) vs 16 (sum children))]
+PLAN-ROOT SINK
+| output exprs: c_customer_id
+| mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB
thread-reservation=0 cost=100
+|
+27:MERGING-EXCHANGE [UNPARTITIONED]
+| order by: c_customer_id ASC
+| limit: 100
+| mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+| tuple-ids=13 row-size=28B cardinality=100 cost=4
+| in pipelines: 14(GETNEXT)
+|
+F04:PLAN FRAGMENT [HASH(sr_customer_sk)] hosts=3 instances=3 (adjusted from
384)
+Per-Instance Resources: mem-estimate=504.82KB mem-reservation=0B
thread-reservation=1
+max-parallelism=3 segment-costs=[215624, 4] cpu-comparison-result=16 [max(3
(self) vs 16 (sum children))]
+14:TOP-N [LIMIT=100]
+| order by: c_customer_id ASC
+| mem-estimate=2.73KB mem-reservation=0B thread-reservation=0
+| tuple-ids=13 row-size=28B cardinality=100 cost=100
+| in pipelines: 14(GETNEXT), 17(OPEN)
+|
+13:HASH JOIN [LEFT SEMI JOIN, BROADCAST]
+| hash-table-id=00
+| hash predicates: sr_store_sk = ctr2.ctr_store_sk
+| other join predicates: sum(SR_RETURN_AMT) > avg(ctr_total_return) *
CAST(1.2 AS DECIMAL(2,1))
+| mem-estimate=0B mem-reservation=0B spill-buffer=64.00KB thread-reservation=0
+| tuple-ids=2,5,4 row-size=74B cardinality=53.52K cost=107030
+| in pipelines: 17(GETNEXT), 25(OPEN)
+|
+|--F11:PLAN FRAGMENT [HASH(sr_customer_sk)] hosts=3 instances=3
+| | Per-Instance Resources: mem-estimate=2.95MB mem-reservation=2.94MB
thread-reservation=1 runtime-filters-memory=1.00MB
+| | max-parallelism=3 segment-costs=[9] cpu-comparison-result=4 [max(3
(self) vs 4 (sum children))]
+| JOIN BUILD
+| | join-table-id=00 plan-id=01 cohort-id=01
+| | build expressions: ctr2.ctr_store_sk
+| | runtime filters: RF000[bloom] <- ctr2.ctr_store_sk, RF001[min_max] <-
ctr2.ctr_store_sk
+| | mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB
thread-reservation=0 cost=6
+| |
+| 26:EXCHANGE [BROADCAST]
+| | mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+| | tuple-ids=11 row-size=20B cardinality=6 cost=3
+| | in pipelines: 25(GETNEXT)
+| |
+| F09:PLAN FRAGMENT [HASH(ctr2.ctr_store_sk)] hosts=3 instances=3 (adjusted
from 384)
+| Per-Instance Resources: mem-estimate=10.09MB mem-reservation=1.94MB
thread-reservation=1
+| max-parallelism=3 segment-costs=[13, 1] cpu-comparison-result=4 [max(3
(self) vs 4 (sum children))]
+| 25:AGGREGATE [FINALIZE]
+| | output: avg:merge(ctr_total_return)
+| | group by: ctr2.ctr_store_sk
+| | mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB
thread-reservation=0
+| | tuple-ids=11 row-size=20B cardinality=6 cost=12
+| | in pipelines: 25(GETNEXT), 23(OPEN)
+| |
+| 24:EXCHANGE [HASH(ctr2.ctr_store_sk)]
+| | mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+| | tuple-ids=10 row-size=20B cardinality=6 cost=1
+| | in pipelines: 23(GETNEXT)
+| |
+| F08:PLAN FRAGMENT [HASH(sr_customer_sk,sr_store_sk)] hosts=3 instances=3
(adjusted from 384)
+| Per-Instance Resources: mem-estimate=20.49MB mem-reservation=3.94MB
thread-reservation=1
+| max-parallelism=3 segment-costs=[162009, 107030, 1] cpu-comparison-result=4
[max(3 (self) vs 4 (sum children))]
+| 10:AGGREGATE [STREAMING]
+| | output: avg(sum(SR_RETURN_AMT))
+| | group by: sr_store_sk
+| | mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB
thread-reservation=0
+| | tuple-ids=10 row-size=20B cardinality=6 cost=107030
+| | in pipelines: 23(GETNEXT)
+| |
+| 23:AGGREGATE [FINALIZE]
+| | output: sum:merge(SR_RETURN_AMT)
+| | group by: sr_customer_sk, sr_store_sk
+| | mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB
thread-reservation=0
+| | tuple-ids=8 row-size=24B cardinality=53.52K cost=160545
+| | in pipelines: 23(GETNEXT), 06(OPEN)
+| |
+| 22:EXCHANGE [HASH(sr_customer_sk,sr_store_sk)]
+| | mem-estimate=502.09KB mem-reservation=0B thread-reservation=0
+| | tuple-ids=8 row-size=24B cardinality=53.52K cost=1464
+| | in pipelines: 06(GETNEXT)
+| |
+| F06:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 (adjusted from 384)
+| Per-Host Shared Resources: mem-estimate=1.00MB mem-reservation=1.00MB
thread-reservation=0 runtime-filters-memory=1.00MB
+| Per-Instance Resources: mem-estimate=26.33MB mem-reservation=2.12MB
thread-reservation=1
+| max-parallelism=3 segment-costs=[18864896, 1464] cpu-comparison-result=4
[max(3 (self) vs 4 (sum children))]
+| 09:AGGREGATE [STREAMING]
+| | output: sum(SR_RETURN_AMT)
+| | group by: sr_customer_sk, sr_store_sk
+| | mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB
thread-reservation=0
+| | tuple-ids=8 row-size=24B cardinality=53.52K cost=160545
+| | in pipelines: 06(GETNEXT)
+| |
+| 08:HASH JOIN [INNER JOIN, BROADCAST]
+| | hash-table-id=01
+| | hash predicates: sr_returned_date_sk = d_date_sk
+| | fk/pk conjuncts: sr_returned_date_sk = d_date_sk
+| | mem-estimate=0B mem-reservation=0B spill-buffer=64.00KB
thread-reservation=0
+| | tuple-ids=6,7 row-size=24B cardinality=53.52K cost=53515
+| | in pipelines: 06(GETNEXT), 07(OPEN)
+| |
+| |--F12:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+| | | Per-Instance Resources: mem-estimate=2.95MB mem-reservation=2.94MB
thread-reservation=1 runtime-filters-memory=1.00MB
+| | | max-parallelism=3 segment-costs=[388]
+| | JOIN BUILD
+| | | join-table-id=01 plan-id=02 cohort-id=02
+| | | build expressions: d_date_sk
+| | | runtime filters: RF008[bloom] <- d_date_sk
+| | | mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB
thread-reservation=0 cost=373
+| | |
+| | 21:EXCHANGE [BROADCAST]
+| | | mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+| | | tuple-ids=7 row-size=8B cardinality=373 cost=15
+| | | in pipelines: 07(GETNEXT)
+| | |
+| | F07:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+| | Per-Instance Resources: mem-estimate=16.05MB mem-reservation=512.00KB
thread-reservation=1
+| | max-parallelism=1 segment-costs=[123625]
+| | 07:SCAN HDFS [tpcds_partitioned_parquet_snap.date_dim, RANDOM]
+| | HDFS partitions=1/1 files=1 size=2.17MB
+| | predicates: d_year = CAST(2000 AS INT)
+| | stored statistics:
+| | table: rows=73.05K size=2.17MB
+| | columns: all
+| | extrapolated-rows=disabled max-scan-range-rows=73.05K
+| | parquet statistics predicates: d_year = CAST(2000 AS INT)
+| | parquet dictionary predicates: d_year = CAST(2000 AS INT)
+| | mem-estimate=16.00MB mem-reservation=512.00KB thread-reservation=0
+| | tuple-ids=7 row-size=8B cardinality=373 cost=123620
+| | in pipelines: 07(GETNEXT)
+| |
+| 06:SCAN HDFS [tpcds_partitioned_parquet_snap.store_returns, RANDOM]
+| HDFS partitions=2004/2004 files=2004 size=33.63MB
+| runtime filters: RF008[bloom] -> sr_returned_date_sk
+| stored statistics:
+| table: rows=287.51K size=33.63MB
+| partitions: 2004/2004 rows=287.51K
+| columns: all
+| extrapolated-rows=disabled max-scan-range-rows=10.01K
est-scan-range=373(filtered from 2004)
+| mem-estimate=16.00MB mem-reservation=128.00KB thread-reservation=0
+| tuple-ids=6 row-size=16B cardinality=53.52K(filtered from 287.51K)
cost=18650836
+| in pipelines: 06(GETNEXT)
+|
+12:HASH JOIN [INNER JOIN, BROADCAST]
+| hash-table-id=02
+| hash predicates: sr_store_sk = s_store_sk
+| fk/pk conjuncts: sr_store_sk = s_store_sk
+| mem-estimate=0B mem-reservation=0B spill-buffer=64.00KB thread-reservation=0
+| tuple-ids=2,5,4 row-size=74B cardinality=53.52K cost=53515
+| in pipelines: 17(GETNEXT), 04(OPEN)
+|
+|--F13:PLAN FRAGMENT [HASH(sr_customer_sk)] hosts=3 instances=3
+| | Per-Instance Resources: mem-estimate=2.95MB mem-reservation=2.94MB
thread-reservation=1 runtime-filters-memory=1.00MB
+| | max-parallelism=3 segment-costs=[15]
+| JOIN BUILD
+| | join-table-id=02 plan-id=03 cohort-id=01
+| | build expressions: s_store_sk
+| | runtime filters: RF002[bloom] <- s_store_sk, RF003[min_max] <- s_store_sk
+| | mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB
thread-reservation=0 cost=12
+| |
+| 20:EXCHANGE [BROADCAST]
+| | mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+| | tuple-ids=4 row-size=18B cardinality=12 cost=3
+| | in pipelines: 04(GETNEXT)
+| |
+| F05:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+| Per-Host Shared Resources: mem-estimate=1.00MB mem-reservation=1.00MB
thread-reservation=0 runtime-filters-memory=1.00MB
+| Per-Instance Resources: mem-estimate=16.09MB mem-reservation=16.00KB
thread-reservation=1
+| max-parallelism=1 segment-costs=[50014]
+| 04:SCAN HDFS [tpcds_partitioned_parquet_snap.store, RANDOM]
+| HDFS partitions=1/1 files=1 size=9.81KB
+| predicates: s_state = 'TN'
+| runtime filters: RF001[min_max] ->
tpcds_partitioned_parquet_snap.store.s_store_sk, RF000[bloom] ->
tpcds_partitioned_parquet_snap.store.s_store_sk
+| stored statistics:
+| table: rows=12 size=9.81KB
+| columns: all
+| extrapolated-rows=disabled max-scan-range-rows=12
+| parquet statistics predicates: s_state = 'TN'
+| parquet dictionary predicates: s_state = 'TN'
+| mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=0
+| tuple-ids=4 row-size=18B cardinality=12 cost=50013
+| in pipelines: 04(GETNEXT)
+|
+11:HASH JOIN [INNER JOIN, PARTITIONED]
+| hash-table-id=03
+| hash predicates: sr_customer_sk = c_customer_sk
+| fk/pk conjuncts: none
+| mem-estimate=0B mem-reservation=0B spill-buffer=256.00KB
thread-reservation=0
+| tuple-ids=2,5 row-size=56B cardinality=53.52K cost=53515
+| in pipelines: 17(GETNEXT), 05(OPEN)
+|
+|--F14:PLAN FRAGMENT [HASH(sr_customer_sk)] hosts=3 instances=3 (adjusted from
384)
+| | Per-Instance Resources: mem-estimate=8.84MB mem-reservation=5.75MB
thread-reservation=1 runtime-filters-memory=1.00MB
+| | max-parallelism=3 segment-costs=[103516]
+| JOIN BUILD
+| | join-table-id=03 plan-id=04 cohort-id=01
+| | build expressions: c_customer_sk
+| | runtime filters: RF004[bloom] <- c_customer_sk, RF005[min_max] <-
c_customer_sk
+| | mem-estimate=4.75MB mem-reservation=4.75MB spill-buffer=256.00KB
thread-reservation=0 cost=100000
+| |
+| 19:EXCHANGE [HASH(c_customer_sk)]
+| | mem-estimate=3.09MB mem-reservation=0B thread-reservation=0
+| | tuple-ids=5 row-size=32B cardinality=100.00K cost=3516
+| | in pipelines: 05(GETNEXT)
+| |
+| F03:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+| Per-Instance Resources: mem-estimate=16.42MB mem-reservation=2.00MB
thread-reservation=1
+| max-parallelism=1 segment-costs=[56641]
+| 05:SCAN HDFS [tpcds_partitioned_parquet_snap.customer, RANDOM]
+| HDFS partitions=1/1 files=1 size=5.49MB
+| stored statistics:
+| table: rows=100.00K size=5.49MB
+| columns: all
+| extrapolated-rows=disabled max-scan-range-rows=100.00K
+| mem-estimate=16.00MB mem-reservation=2.00MB thread-reservation=0
+| tuple-ids=5 row-size=32B cardinality=100.00K cost=53125
+| in pipelines: 05(GETNEXT)
+|
+18:EXCHANGE [HASH(sr_customer_sk)]
+| mem-estimate=502.09KB mem-reservation=0B thread-reservation=0
+| tuple-ids=2 row-size=24B cardinality=53.52K cost=1464
+| in pipelines: 17(GETNEXT)
+|
+F02:PLAN FRAGMENT [HASH(sr_customer_sk,sr_store_sk)] hosts=3 instances=3
(adjusted from 384)
+Per-Instance Resources: mem-estimate=10.49MB mem-reservation=1.94MB
thread-reservation=1
+max-parallelism=3 segment-costs=[162009, 1464] cpu-comparison-result=4 [max(3
(self) vs 4 (sum children))]
+17:AGGREGATE [FINALIZE]
+| output: sum:merge(SR_RETURN_AMT)
+| group by: sr_customer_sk, sr_store_sk
+| mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB
thread-reservation=0
+| tuple-ids=2 row-size=24B cardinality=53.52K cost=160545
+| in pipelines: 17(GETNEXT), 00(OPEN)
+|
+16:EXCHANGE [HASH(sr_customer_sk,sr_store_sk)]
+| mem-estimate=502.09KB mem-reservation=0B thread-reservation=0
+| tuple-ids=2 row-size=24B cardinality=53.52K cost=1464
+| in pipelines: 00(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 (adjusted from 384)
+Per-Host Shared Resources: mem-estimate=4.00MB mem-reservation=4.00MB
thread-reservation=0 runtime-filters-memory=4.00MB
+Per-Instance Resources: mem-estimate=26.33MB mem-reservation=2.12MB
thread-reservation=1
+max-parallelism=3 segment-costs=[18864896, 1464] cpu-comparison-result=4
[max(3 (self) vs 4 (sum children))]
+03:AGGREGATE [STREAMING]
+| output: sum(SR_RETURN_AMT)
+| group by: sr_customer_sk, sr_store_sk
+| mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB
thread-reservation=0
+| tuple-ids=2 row-size=24B cardinality=53.52K cost=160545
+| in pipelines: 00(GETNEXT)
+|
+02:HASH JOIN [INNER JOIN, BROADCAST]
+| hash-table-id=04
+| hash predicates: sr_returned_date_sk = d_date_sk
+| fk/pk conjuncts: sr_returned_date_sk = d_date_sk
+| mem-estimate=0B mem-reservation=0B spill-buffer=64.00KB thread-reservation=0
+| tuple-ids=0,1 row-size=24B cardinality=53.52K cost=53515
+| in pipelines: 00(GETNEXT), 01(OPEN)
+|
+|--F15:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+| | Per-Instance Resources: mem-estimate=2.95MB mem-reservation=2.94MB
thread-reservation=1 runtime-filters-memory=1.00MB
+| | max-parallelism=3 segment-costs=[388]
+| JOIN BUILD
+| | join-table-id=04 plan-id=05 cohort-id=01
+| | build expressions: d_date_sk
+| | runtime filters: RF006[bloom] <- d_date_sk
+| | mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB
thread-reservation=0 cost=373
+| |
+| 15:EXCHANGE [BROADCAST]
+| | mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+| | tuple-ids=1 row-size=8B cardinality=373 cost=15
+| | in pipelines: 01(GETNEXT)
+| |
+| F01:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+| Per-Instance Resources: mem-estimate=16.05MB mem-reservation=512.00KB
thread-reservation=1
+| max-parallelism=1 segment-costs=[123625]
+| 01:SCAN HDFS [tpcds_partitioned_parquet_snap.date_dim, RANDOM]
+| HDFS partitions=1/1 files=1 size=2.17MB
+| predicates: d_year = CAST(2000 AS INT)
+| stored statistics:
+| table: rows=73.05K size=2.17MB
+| columns: all
+| extrapolated-rows=disabled max-scan-range-rows=73.05K
+| parquet statistics predicates: d_year = CAST(2000 AS INT)
+| parquet dictionary predicates: d_year = CAST(2000 AS INT)
+| mem-estimate=16.00MB mem-reservation=512.00KB thread-reservation=0
+| tuple-ids=1 row-size=8B cardinality=373 cost=123620
+| in pipelines: 01(GETNEXT)
+|
+00:SCAN HDFS [tpcds_partitioned_parquet_snap.store_returns, RANDOM]
+ HDFS partitions=2004/2004 files=2004 size=33.63MB
+ runtime filters: RF001[min_max] ->
tpcds_partitioned_parquet_snap.store_returns.sr_store_sk, RF003[min_max] ->
tpcds_partitioned_parquet_snap.store_returns.sr_store_sk, RF005[min_max] ->
tpcds_partitioned_parquet_snap.store_returns.sr_customer_sk, RF000[bloom] ->
tpcds_partitioned_parquet_snap.store_returns.sr_store_sk, RF002[bloom] ->
tpcds_partitioned_parquet_snap.store_returns.sr_store_sk, RF004[bloom] ->
tpcds_partitioned_parquet_snap.store_returns.sr_customer_sk, RF006[bloom] ->
[...]
+ stored statistics:
+ table: rows=287.51K size=33.63MB
+ partitions: 2004/2004 rows=287.51K
+ columns: all
+ extrapolated-rows=disabled max-scan-range-rows=10.01K
est-scan-range=373(filtered from 2004)
+ mem-estimate=16.00MB mem-reservation=128.00KB thread-reservation=0
+ tuple-ids=0 row-size=16B cardinality=53.52K(filtered from 287.51K)
cost=18650836
+ in pipelines: 00(GETNEXT)
+====
+# QUERY: TPCDS-Q1-CPC-LARGEST-FRAGMENT
+# Expect a total of 3 admission slots given to this query if using
LARGEST_FRAGMENT strategy.
+with customer_total_return as (
+ select sr_customer_sk as ctr_customer_sk,
+ sr_store_sk as ctr_store_sk,
+ sum(SR_RETURN_AMT) as ctr_total_return
+ from tpcds_partitioned_parquet_snap.store_returns,
+ tpcds_partitioned_parquet_snap.date_dim
+ where sr_returned_date_sk = d_date_sk
+ and d_year = 2000
+ group by sr_customer_sk, sr_store_sk
+) select c_customer_id
+from customer_total_return ctr1,
+ tpcds_partitioned_parquet_snap.store,
+ tpcds_partitioned_parquet_snap.customer
+where ctr1.ctr_total_return > (
+ select avg(ctr_total_return) * 1.2
+ from customer_total_return ctr2
+ where ctr1.ctr_store_sk = ctr2.ctr_store_sk)
+and s_store_sk = ctr1.ctr_store_sk
+and s_state = 'TN'
+and ctr1.ctr_customer_sk = c_customer_sk
+order by c_customer_id
+limit 100;
+---- QUERYOPTIONS
+COMPUTE_PROCESSING_COST=True
+SLOT_COUNT_STRATEGY=LARGEST_FRAGMENT
+---- PARALLELPLANS
+Max Per-Host Resource Reservation: Memory=42.58MB Threads=16
+Per-Host Resource Estimates: Memory=189MB
+F10:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+| Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB
thread-reservation=1
+| max-parallelism=1 segment-costs=[104] cpu-comparison-result=16 [max(1
(self) vs 16 (sum children))]
+PLAN-ROOT SINK
+| output exprs: c_customer_id
+| mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB
thread-reservation=0 cost=100
+|
+27:MERGING-EXCHANGE [UNPARTITIONED]
+| order by: c_customer_id ASC
+| limit: 100
+| mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+| tuple-ids=13 row-size=28B cardinality=100 cost=4
+| in pipelines: 14(GETNEXT)
+|
+F04:PLAN FRAGMENT [HASH(sr_customer_sk)] hosts=3 instances=3 (adjusted from
384)
+Per-Instance Resources: mem-estimate=504.82KB mem-reservation=0B
thread-reservation=1
+max-parallelism=3 segment-costs=[215624, 4] cpu-comparison-result=16 [max(3
(self) vs 16 (sum children))]
+14:TOP-N [LIMIT=100]
+| order by: c_customer_id ASC
+| mem-estimate=2.73KB mem-reservation=0B thread-reservation=0
+| tuple-ids=13 row-size=28B cardinality=100 cost=100
+| in pipelines: 14(GETNEXT), 17(OPEN)
+|
+13:HASH JOIN [LEFT SEMI JOIN, BROADCAST]
+| hash-table-id=00
+| hash predicates: sr_store_sk = ctr2.ctr_store_sk
+| other join predicates: sum(SR_RETURN_AMT) > avg(ctr_total_return) *
CAST(1.2 AS DECIMAL(2,1))
+| mem-estimate=0B mem-reservation=0B spill-buffer=64.00KB thread-reservation=0
+| tuple-ids=2,5,4 row-size=74B cardinality=53.52K cost=107030
+| in pipelines: 17(GETNEXT), 25(OPEN)
+|
+|--F11:PLAN FRAGMENT [HASH(sr_customer_sk)] hosts=3 instances=3
+| | Per-Instance Resources: mem-estimate=2.95MB mem-reservation=2.94MB
thread-reservation=1 runtime-filters-memory=1.00MB
+| | max-parallelism=3 segment-costs=[9] cpu-comparison-result=4 [max(3
(self) vs 4 (sum children))]
+| JOIN BUILD
+| | join-table-id=00 plan-id=01 cohort-id=01
+| | build expressions: ctr2.ctr_store_sk
+| | runtime filters: RF000[bloom] <- ctr2.ctr_store_sk, RF001[min_max] <-
ctr2.ctr_store_sk
+| | mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB
thread-reservation=0 cost=6
+| |
+| 26:EXCHANGE [BROADCAST]
+| | mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+| | tuple-ids=11 row-size=20B cardinality=6 cost=3
+| | in pipelines: 25(GETNEXT)
+| |
+| F09:PLAN FRAGMENT [HASH(ctr2.ctr_store_sk)] hosts=3 instances=3 (adjusted
from 384)
+| Per-Instance Resources: mem-estimate=10.09MB mem-reservation=1.94MB
thread-reservation=1
+| max-parallelism=3 segment-costs=[13, 1] cpu-comparison-result=4 [max(3
(self) vs 4 (sum children))]
+| 25:AGGREGATE [FINALIZE]
+| | output: avg:merge(ctr_total_return)
+| | group by: ctr2.ctr_store_sk
+| | mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB
thread-reservation=0
+| | tuple-ids=11 row-size=20B cardinality=6 cost=12
+| | in pipelines: 25(GETNEXT), 23(OPEN)
+| |
+| 24:EXCHANGE [HASH(ctr2.ctr_store_sk)]
+| | mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+| | tuple-ids=10 row-size=20B cardinality=6 cost=1
+| | in pipelines: 23(GETNEXT)
+| |
+| F08:PLAN FRAGMENT [HASH(sr_customer_sk,sr_store_sk)] hosts=3 instances=3
(adjusted from 384)
+| Per-Instance Resources: mem-estimate=20.49MB mem-reservation=3.94MB
thread-reservation=1
+| max-parallelism=3 segment-costs=[162009, 107030, 1] cpu-comparison-result=4
[max(3 (self) vs 4 (sum children))]
+| 10:AGGREGATE [STREAMING]
+| | output: avg(sum(SR_RETURN_AMT))
+| | group by: sr_store_sk
+| | mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB
thread-reservation=0
+| | tuple-ids=10 row-size=20B cardinality=6 cost=107030
+| | in pipelines: 23(GETNEXT)
+| |
+| 23:AGGREGATE [FINALIZE]
+| | output: sum:merge(SR_RETURN_AMT)
+| | group by: sr_customer_sk, sr_store_sk
+| | mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB
thread-reservation=0
+| | tuple-ids=8 row-size=24B cardinality=53.52K cost=160545
+| | in pipelines: 23(GETNEXT), 06(OPEN)
+| |
+| 22:EXCHANGE [HASH(sr_customer_sk,sr_store_sk)]
+| | mem-estimate=502.09KB mem-reservation=0B thread-reservation=0
+| | tuple-ids=8 row-size=24B cardinality=53.52K cost=1464
+| | in pipelines: 06(GETNEXT)
+| |
+| F06:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 (adjusted from 384)
+| Per-Host Shared Resources: mem-estimate=1.00MB mem-reservation=1.00MB
thread-reservation=0 runtime-filters-memory=1.00MB
+| Per-Instance Resources: mem-estimate=26.33MB mem-reservation=2.12MB
thread-reservation=1
+| max-parallelism=3 segment-costs=[18864896, 1464] cpu-comparison-result=4
[max(3 (self) vs 4 (sum children))]
+| 09:AGGREGATE [STREAMING]
+| | output: sum(SR_RETURN_AMT)
+| | group by: sr_customer_sk, sr_store_sk
+| | mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB
thread-reservation=0
+| | tuple-ids=8 row-size=24B cardinality=53.52K cost=160545
+| | in pipelines: 06(GETNEXT)
+| |
+| 08:HASH JOIN [INNER JOIN, BROADCAST]
+| | hash-table-id=01
+| | hash predicates: sr_returned_date_sk = d_date_sk
+| | fk/pk conjuncts: sr_returned_date_sk = d_date_sk
+| | mem-estimate=0B mem-reservation=0B spill-buffer=64.00KB
thread-reservation=0
+| | tuple-ids=6,7 row-size=24B cardinality=53.52K cost=53515
+| | in pipelines: 06(GETNEXT), 07(OPEN)
+| |
+| |--F12:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+| | | Per-Instance Resources: mem-estimate=2.95MB mem-reservation=2.94MB
thread-reservation=1 runtime-filters-memory=1.00MB
+| | | max-parallelism=3 segment-costs=[388]
+| | JOIN BUILD
+| | | join-table-id=01 plan-id=02 cohort-id=02
+| | | build expressions: d_date_sk
+| | | runtime filters: RF008[bloom] <- d_date_sk
+| | | mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB
thread-reservation=0 cost=373
+| | |
+| | 21:EXCHANGE [BROADCAST]
+| | | mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+| | | tuple-ids=7 row-size=8B cardinality=373 cost=15
+| | | in pipelines: 07(GETNEXT)
+| | |
+| | F07:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+| | Per-Instance Resources: mem-estimate=16.05MB mem-reservation=512.00KB
thread-reservation=1
+| | max-parallelism=1 segment-costs=[123625]
+| | 07:SCAN HDFS [tpcds_partitioned_parquet_snap.date_dim, RANDOM]
+| | HDFS partitions=1/1 files=1 size=2.17MB
+| | predicates: d_year = CAST(2000 AS INT)
+| | stored statistics:
+| | table: rows=73.05K size=2.17MB
+| | columns: all
+| | extrapolated-rows=disabled max-scan-range-rows=73.05K
+| | parquet statistics predicates: d_year = CAST(2000 AS INT)
+| | parquet dictionary predicates: d_year = CAST(2000 AS INT)
+| | mem-estimate=16.00MB mem-reservation=512.00KB thread-reservation=0
+| | tuple-ids=7 row-size=8B cardinality=373 cost=123620
+| | in pipelines: 07(GETNEXT)
+| |
+| 06:SCAN HDFS [tpcds_partitioned_parquet_snap.store_returns, RANDOM]
+| HDFS partitions=2004/2004 files=2004 size=33.63MB
+| runtime filters: RF008[bloom] -> sr_returned_date_sk
+| stored statistics:
+| table: rows=287.51K size=33.63MB
+| partitions: 2004/2004 rows=287.51K
+| columns: all
+| extrapolated-rows=disabled max-scan-range-rows=10.01K
est-scan-range=373(filtered from 2004)
+| mem-estimate=16.00MB mem-reservation=128.00KB thread-reservation=0
+| tuple-ids=6 row-size=16B cardinality=53.52K(filtered from 287.51K)
cost=18650836
+| in pipelines: 06(GETNEXT)
+|
+12:HASH JOIN [INNER JOIN, BROADCAST]
+| hash-table-id=02
+| hash predicates: sr_store_sk = s_store_sk
+| fk/pk conjuncts: sr_store_sk = s_store_sk
+| mem-estimate=0B mem-reservation=0B spill-buffer=64.00KB thread-reservation=0
+| tuple-ids=2,5,4 row-size=74B cardinality=53.52K cost=53515
+| in pipelines: 17(GETNEXT), 04(OPEN)
+|
+|--F13:PLAN FRAGMENT [HASH(sr_customer_sk)] hosts=3 instances=3
+| | Per-Instance Resources: mem-estimate=2.95MB mem-reservation=2.94MB
thread-reservation=1 runtime-filters-memory=1.00MB
+| | max-parallelism=3 segment-costs=[15]
+| JOIN BUILD
+| | join-table-id=02 plan-id=03 cohort-id=01
+| | build expressions: s_store_sk
+| | runtime filters: RF002[bloom] <- s_store_sk, RF003[min_max] <- s_store_sk
+| | mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB
thread-reservation=0 cost=12
+| |
+| 20:EXCHANGE [BROADCAST]
+| | mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+| | tuple-ids=4 row-size=18B cardinality=12 cost=3
+| | in pipelines: 04(GETNEXT)
+| |
+| F05:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+| Per-Host Shared Resources: mem-estimate=1.00MB mem-reservation=1.00MB
thread-reservation=0 runtime-filters-memory=1.00MB
+| Per-Instance Resources: mem-estimate=16.09MB mem-reservation=16.00KB
thread-reservation=1
+| max-parallelism=1 segment-costs=[50014]
+| 04:SCAN HDFS [tpcds_partitioned_parquet_snap.store, RANDOM]
+| HDFS partitions=1/1 files=1 size=9.81KB
+| predicates: s_state = 'TN'
+| runtime filters: RF001[min_max] ->
tpcds_partitioned_parquet_snap.store.s_store_sk, RF000[bloom] ->
tpcds_partitioned_parquet_snap.store.s_store_sk
+| stored statistics:
+| table: rows=12 size=9.81KB
+| columns: all
+| extrapolated-rows=disabled max-scan-range-rows=12
+| parquet statistics predicates: s_state = 'TN'
+| parquet dictionary predicates: s_state = 'TN'
+| mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=0
+| tuple-ids=4 row-size=18B cardinality=12 cost=50013
+| in pipelines: 04(GETNEXT)
+|
+11:HASH JOIN [INNER JOIN, PARTITIONED]
+| hash-table-id=03
+| hash predicates: sr_customer_sk = c_customer_sk
+| fk/pk conjuncts: none
+| mem-estimate=0B mem-reservation=0B spill-buffer=256.00KB
thread-reservation=0
+| tuple-ids=2,5 row-size=56B cardinality=53.52K cost=53515
+| in pipelines: 17(GETNEXT), 05(OPEN)
+|
+|--F14:PLAN FRAGMENT [HASH(sr_customer_sk)] hosts=3 instances=3 (adjusted from
384)
+| | Per-Instance Resources: mem-estimate=8.84MB mem-reservation=5.75MB
thread-reservation=1 runtime-filters-memory=1.00MB
+| | max-parallelism=3 segment-costs=[103516]
+| JOIN BUILD
+| | join-table-id=03 plan-id=04 cohort-id=01
+| | build expressions: c_customer_sk
+| | runtime filters: RF004[bloom] <- c_customer_sk, RF005[min_max] <-
c_customer_sk
+| | mem-estimate=4.75MB mem-reservation=4.75MB spill-buffer=256.00KB
thread-reservation=0 cost=100000
+| |
+| 19:EXCHANGE [HASH(c_customer_sk)]
+| | mem-estimate=3.09MB mem-reservation=0B thread-reservation=0
+| | tuple-ids=5 row-size=32B cardinality=100.00K cost=3516
+| | in pipelines: 05(GETNEXT)
+| |
+| F03:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+| Per-Instance Resources: mem-estimate=16.42MB mem-reservation=2.00MB
thread-reservation=1
+| max-parallelism=1 segment-costs=[56641]
+| 05:SCAN HDFS [tpcds_partitioned_parquet_snap.customer, RANDOM]
+| HDFS partitions=1/1 files=1 size=5.49MB
+| stored statistics:
+| table: rows=100.00K size=5.49MB
+| columns: all
+| extrapolated-rows=disabled max-scan-range-rows=100.00K
+| mem-estimate=16.00MB mem-reservation=2.00MB thread-reservation=0
+| tuple-ids=5 row-size=32B cardinality=100.00K cost=53125
+| in pipelines: 05(GETNEXT)
+|
+18:EXCHANGE [HASH(sr_customer_sk)]
+| mem-estimate=502.09KB mem-reservation=0B thread-reservation=0
+| tuple-ids=2 row-size=24B cardinality=53.52K cost=1464
+| in pipelines: 17(GETNEXT)
+|
+F02:PLAN FRAGMENT [HASH(sr_customer_sk,sr_store_sk)] hosts=3 instances=3
(adjusted from 384)
+Per-Instance Resources: mem-estimate=10.49MB mem-reservation=1.94MB
thread-reservation=1
+max-parallelism=3 segment-costs=[162009, 1464] cpu-comparison-result=4 [max(3
(self) vs 4 (sum children))]
+17:AGGREGATE [FINALIZE]
+| output: sum:merge(SR_RETURN_AMT)
+| group by: sr_customer_sk, sr_store_sk
+| mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB
thread-reservation=0
+| tuple-ids=2 row-size=24B cardinality=53.52K cost=160545
+| in pipelines: 17(GETNEXT), 00(OPEN)
+|
+16:EXCHANGE [HASH(sr_customer_sk,sr_store_sk)]
+| mem-estimate=502.09KB mem-reservation=0B thread-reservation=0
+| tuple-ids=2 row-size=24B cardinality=53.52K cost=1464
+| in pipelines: 00(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 (adjusted from 384)
+Per-Host Shared Resources: mem-estimate=4.00MB mem-reservation=4.00MB
thread-reservation=0 runtime-filters-memory=4.00MB
+Per-Instance Resources: mem-estimate=26.33MB mem-reservation=2.12MB
thread-reservation=1
+max-parallelism=3 segment-costs=[18864896, 1464] cpu-comparison-result=4
[max(3 (self) vs 4 (sum children))]
+03:AGGREGATE [STREAMING]
+| output: sum(SR_RETURN_AMT)
+| group by: sr_customer_sk, sr_store_sk
+| mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB
thread-reservation=0
+| tuple-ids=2 row-size=24B cardinality=53.52K cost=160545
+| in pipelines: 00(GETNEXT)
+|
+02:HASH JOIN [INNER JOIN, BROADCAST]
+| hash-table-id=04
+| hash predicates: sr_returned_date_sk = d_date_sk
+| fk/pk conjuncts: sr_returned_date_sk = d_date_sk
+| mem-estimate=0B mem-reservation=0B spill-buffer=64.00KB thread-reservation=0
+| tuple-ids=0,1 row-size=24B cardinality=53.52K cost=53515
+| in pipelines: 00(GETNEXT), 01(OPEN)
+|
+|--F15:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+| | Per-Instance Resources: mem-estimate=2.95MB mem-reservation=2.94MB
thread-reservation=1 runtime-filters-memory=1.00MB
+| | max-parallelism=3 segment-costs=[388]
+| JOIN BUILD
+| | join-table-id=04 plan-id=05 cohort-id=01
+| | build expressions: d_date_sk
+| | runtime filters: RF006[bloom] <- d_date_sk
+| | mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB
thread-reservation=0 cost=373
+| |
+| 15:EXCHANGE [BROADCAST]
+| | mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+| | tuple-ids=1 row-size=8B cardinality=373 cost=15
+| | in pipelines: 01(GETNEXT)
+| |
+| F01:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+| Per-Instance Resources: mem-estimate=16.05MB mem-reservation=512.00KB
thread-reservation=1
+| max-parallelism=1 segment-costs=[123625]
+| 01:SCAN HDFS [tpcds_partitioned_parquet_snap.date_dim, RANDOM]
+| HDFS partitions=1/1 files=1 size=2.17MB
+| predicates: d_year = CAST(2000 AS INT)
+| stored statistics:
+| table: rows=73.05K size=2.17MB
+| columns: all
+| extrapolated-rows=disabled max-scan-range-rows=73.05K
+| parquet statistics predicates: d_year = CAST(2000 AS INT)
+| parquet dictionary predicates: d_year = CAST(2000 AS INT)
+| mem-estimate=16.00MB mem-reservation=512.00KB thread-reservation=0
+| tuple-ids=1 row-size=8B cardinality=373 cost=123620
+| in pipelines: 01(GETNEXT)
+|
+00:SCAN HDFS [tpcds_partitioned_parquet_snap.store_returns, RANDOM]
+ HDFS partitions=2004/2004 files=2004 size=33.63MB
+ runtime filters: RF001[min_max] ->
tpcds_partitioned_parquet_snap.store_returns.sr_store_sk, RF003[min_max] ->
tpcds_partitioned_parquet_snap.store_returns.sr_store_sk, RF005[min_max] ->
tpcds_partitioned_parquet_snap.store_returns.sr_customer_sk, RF000[bloom] ->
tpcds_partitioned_parquet_snap.store_returns.sr_store_sk, RF002[bloom] ->
tpcds_partitioned_parquet_snap.store_returns.sr_store_sk, RF004[bloom] ->
tpcds_partitioned_parquet_snap.store_returns.sr_customer_sk, RF006[bloom] ->
[...]
+ stored statistics:
+ table: rows=287.51K size=33.63MB
+ partitions: 2004/2004 rows=287.51K
+ columns: all
+ extrapolated-rows=disabled max-scan-range-rows=10.01K
est-scan-range=373(filtered from 2004)
+ mem-estimate=16.00MB mem-reservation=128.00KB thread-reservation=0
+ tuple-ids=0 row-size=16B cardinality=53.52K(filtered from 287.51K)
cost=18650836
+ in pipelines: 00(GETNEXT)
+====
+# QUERY: TPCDS-Q1-NO-CPC
+# Expect a total of 3 admission slots given to this query if
COMPUTE_PROCESSING_COST is disabled.
+# Set MT_DOP=1 to keep query compiled in multi thread parallelism mode.
+with customer_total_return as (
+ select sr_customer_sk as ctr_customer_sk,
+ sr_store_sk as ctr_store_sk,
+ sum(SR_RETURN_AMT) as ctr_total_return
+ from tpcds_partitioned_parquet_snap.store_returns,
+ tpcds_partitioned_parquet_snap.date_dim
+ where sr_returned_date_sk = d_date_sk
+ and d_year = 2000
+ group by sr_customer_sk, sr_store_sk
+) select c_customer_id
+from customer_total_return ctr1,
+ tpcds_partitioned_parquet_snap.store,
+ tpcds_partitioned_parquet_snap.customer
+where ctr1.ctr_total_return > (
+ select avg(ctr_total_return) * 1.2
+ from customer_total_return ctr2
+ where ctr1.ctr_store_sk = ctr2.ctr_store_sk)
+and s_store_sk = ctr1.ctr_store_sk
+and s_state = 'TN'
+and ctr1.ctr_customer_sk = c_customer_sk
+order by c_customer_id
+limit 100;
+---- QUERYOPTIONS
+COMPUTE_PROCESSING_COST=False
+MT_DOP=1
+---- PARALLELPLANS
+Max Per-Host Resource Reservation: Memory=42.58MB Threads=16
+Per-Host Resource Estimates: Memory=189MB
+F10:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+| Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB
thread-reservation=1
+PLAN-ROOT SINK
+| output exprs: c_customer_id
+| mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB
thread-reservation=0
+|
+27:MERGING-EXCHANGE [UNPARTITIONED]
+| order by: c_customer_id ASC
+| limit: 100
+| mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+| tuple-ids=13 row-size=28B cardinality=100
+| in pipelines: 14(GETNEXT)
+|
+F04:PLAN FRAGMENT [HASH(sr_customer_sk)] hosts=3 instances=3
+Per-Instance Resources: mem-estimate=504.82KB mem-reservation=0B
thread-reservation=1
+14:TOP-N [LIMIT=100]
+| order by: c_customer_id ASC
+| mem-estimate=2.73KB mem-reservation=0B thread-reservation=0
+| tuple-ids=13 row-size=28B cardinality=100
+| in pipelines: 14(GETNEXT), 17(OPEN)
+|
+13:HASH JOIN [LEFT SEMI JOIN, BROADCAST]
+| hash-table-id=00
+| hash predicates: sr_store_sk = ctr2.ctr_store_sk
+| other join predicates: sum(SR_RETURN_AMT) > avg(ctr_total_return) *
CAST(1.2 AS DECIMAL(2,1))
+| mem-estimate=0B mem-reservation=0B spill-buffer=64.00KB thread-reservation=0
+| tuple-ids=2,5,4 row-size=74B cardinality=53.52K
+| in pipelines: 17(GETNEXT), 25(OPEN)
+|
+|--F11:PLAN FRAGMENT [HASH(sr_customer_sk)] hosts=3 instances=3
+| | Per-Instance Resources: mem-estimate=2.95MB mem-reservation=2.94MB
thread-reservation=1 runtime-filters-memory=1.00MB
+| JOIN BUILD
+| | join-table-id=00 plan-id=01 cohort-id=01
+| | build expressions: ctr2.ctr_store_sk
+| | runtime filters: RF000[bloom] <- ctr2.ctr_store_sk, RF001[min_max] <-
ctr2.ctr_store_sk
+| | mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB
thread-reservation=0
+| |
+| 26:EXCHANGE [BROADCAST]
+| | mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+| | tuple-ids=11 row-size=20B cardinality=6
+| | in pipelines: 25(GETNEXT)
+| |
+| F09:PLAN FRAGMENT [HASH(ctr2.ctr_store_sk)] hosts=3 instances=3
+| Per-Instance Resources: mem-estimate=10.09MB mem-reservation=1.94MB
thread-reservation=1
+| 25:AGGREGATE [FINALIZE]
+| | output: avg:merge(ctr_total_return)
+| | group by: ctr2.ctr_store_sk
+| | mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB
thread-reservation=0
+| | tuple-ids=11 row-size=20B cardinality=6
+| | in pipelines: 25(GETNEXT), 23(OPEN)
+| |
+| 24:EXCHANGE [HASH(ctr2.ctr_store_sk)]
+| | mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+| | tuple-ids=10 row-size=20B cardinality=6
+| | in pipelines: 23(GETNEXT)
+| |
+| F08:PLAN FRAGMENT [HASH(sr_customer_sk,sr_store_sk)] hosts=3 instances=3
+| Per-Instance Resources: mem-estimate=20.49MB mem-reservation=3.94MB
thread-reservation=1
+| 10:AGGREGATE [STREAMING]
+| | output: avg(sum(SR_RETURN_AMT))
+| | group by: sr_store_sk
+| | mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB
thread-reservation=0
+| | tuple-ids=10 row-size=20B cardinality=6
+| | in pipelines: 23(GETNEXT)
+| |
+| 23:AGGREGATE [FINALIZE]
+| | output: sum:merge(SR_RETURN_AMT)
+| | group by: sr_customer_sk, sr_store_sk
+| | mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB
thread-reservation=0
+| | tuple-ids=8 row-size=24B cardinality=53.52K
+| | in pipelines: 23(GETNEXT), 06(OPEN)
+| |
+| 22:EXCHANGE [HASH(sr_customer_sk,sr_store_sk)]
+| | mem-estimate=502.09KB mem-reservation=0B thread-reservation=0
+| | tuple-ids=8 row-size=24B cardinality=53.52K
+| | in pipelines: 06(GETNEXT)
+| |
+| F06:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+| Per-Host Shared Resources: mem-estimate=1.00MB mem-reservation=1.00MB
thread-reservation=0 runtime-filters-memory=1.00MB
+| Per-Instance Resources: mem-estimate=26.33MB mem-reservation=2.12MB
thread-reservation=1
+| 09:AGGREGATE [STREAMING]
+| | output: sum(SR_RETURN_AMT)
+| | group by: sr_customer_sk, sr_store_sk
+| | mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB
thread-reservation=0
+| | tuple-ids=8 row-size=24B cardinality=53.52K
+| | in pipelines: 06(GETNEXT)
+| |
+| 08:HASH JOIN [INNER JOIN, BROADCAST]
+| | hash-table-id=01
+| | hash predicates: sr_returned_date_sk = d_date_sk
+| | fk/pk conjuncts: sr_returned_date_sk = d_date_sk
+| | mem-estimate=0B mem-reservation=0B spill-buffer=64.00KB
thread-reservation=0
+| | tuple-ids=6,7 row-size=24B cardinality=53.52K
+| | in pipelines: 06(GETNEXT), 07(OPEN)
+| |
+| |--F12:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+| | | Per-Instance Resources: mem-estimate=2.95MB mem-reservation=2.94MB
thread-reservation=1 runtime-filters-memory=1.00MB
+| | JOIN BUILD
+| | | join-table-id=01 plan-id=02 cohort-id=02
+| | | build expressions: d_date_sk
+| | | runtime filters: RF008[bloom] <- d_date_sk
+| | | mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB
thread-reservation=0
+| | |
+| | 21:EXCHANGE [BROADCAST]
+| | | mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+| | | tuple-ids=7 row-size=8B cardinality=373
+| | | in pipelines: 07(GETNEXT)
+| | |
+| | F07:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+| | Per-Instance Resources: mem-estimate=16.05MB mem-reservation=512.00KB
thread-reservation=1
+| | 07:SCAN HDFS [tpcds_partitioned_parquet_snap.date_dim, RANDOM]
+| | HDFS partitions=1/1 files=1 size=2.17MB
+| | predicates: d_year = CAST(2000 AS INT)
+| | stored statistics:
+| | table: rows=73.05K size=2.17MB
+| | columns: all
+| | extrapolated-rows=disabled max-scan-range-rows=73.05K
+| | parquet statistics predicates: d_year = CAST(2000 AS INT)
+| | parquet dictionary predicates: d_year = CAST(2000 AS INT)
+| | mem-estimate=16.00MB mem-reservation=512.00KB thread-reservation=0
+| | tuple-ids=7 row-size=8B cardinality=373
+| | in pipelines: 07(GETNEXT)
+| |
+| 06:SCAN HDFS [tpcds_partitioned_parquet_snap.store_returns, RANDOM]
+| HDFS partitions=2004/2004 files=2004 size=33.63MB
+| runtime filters: RF008[bloom] -> sr_returned_date_sk
+| stored statistics:
+| table: rows=287.51K size=33.63MB
+| partitions: 2004/2004 rows=287.51K
+| columns: all
+| extrapolated-rows=disabled max-scan-range-rows=10.01K
est-scan-range=373(filtered from 2004)
+| mem-estimate=16.00MB mem-reservation=128.00KB thread-reservation=0
+| tuple-ids=6 row-size=16B cardinality=53.52K(filtered from 287.51K)
+| in pipelines: 06(GETNEXT)
+|
+12:HASH JOIN [INNER JOIN, BROADCAST]
+| hash-table-id=02
+| hash predicates: sr_store_sk = s_store_sk
+| fk/pk conjuncts: sr_store_sk = s_store_sk
+| mem-estimate=0B mem-reservation=0B spill-buffer=64.00KB thread-reservation=0
+| tuple-ids=2,5,4 row-size=74B cardinality=53.52K
+| in pipelines: 17(GETNEXT), 04(OPEN)
+|
+|--F13:PLAN FRAGMENT [HASH(sr_customer_sk)] hosts=3 instances=3
+| | Per-Instance Resources: mem-estimate=2.95MB mem-reservation=2.94MB
thread-reservation=1 runtime-filters-memory=1.00MB
+| JOIN BUILD
+| | join-table-id=02 plan-id=03 cohort-id=01
+| | build expressions: s_store_sk
+| | runtime filters: RF002[bloom] <- s_store_sk, RF003[min_max] <- s_store_sk
+| | mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB
thread-reservation=0
+| |
+| 20:EXCHANGE [BROADCAST]
+| | mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+| | tuple-ids=4 row-size=18B cardinality=12
+| | in pipelines: 04(GETNEXT)
+| |
+| F05:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+| Per-Host Shared Resources: mem-estimate=1.00MB mem-reservation=1.00MB
thread-reservation=0 runtime-filters-memory=1.00MB
+| Per-Instance Resources: mem-estimate=16.09MB mem-reservation=16.00KB
thread-reservation=1
+| 04:SCAN HDFS [tpcds_partitioned_parquet_snap.store, RANDOM]
+| HDFS partitions=1/1 files=1 size=9.81KB
+| predicates: s_state = 'TN'
+| runtime filters: RF001[min_max] ->
tpcds_partitioned_parquet_snap.store.s_store_sk, RF000[bloom] ->
tpcds_partitioned_parquet_snap.store.s_store_sk
+| stored statistics:
+| table: rows=12 size=9.81KB
+| columns: all
+| extrapolated-rows=disabled max-scan-range-rows=12
+| parquet statistics predicates: s_state = 'TN'
+| parquet dictionary predicates: s_state = 'TN'
+| mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=0
+| tuple-ids=4 row-size=18B cardinality=12
+| in pipelines: 04(GETNEXT)
+|
+11:HASH JOIN [INNER JOIN, PARTITIONED]
+| hash-table-id=03
+| hash predicates: sr_customer_sk = c_customer_sk
+| fk/pk conjuncts: none
+| mem-estimate=0B mem-reservation=0B spill-buffer=256.00KB
thread-reservation=0
+| tuple-ids=2,5 row-size=56B cardinality=53.52K
+| in pipelines: 17(GETNEXT), 05(OPEN)
+|
+|--F14:PLAN FRAGMENT [HASH(sr_customer_sk)] hosts=3 instances=3
+| | Per-Instance Resources: mem-estimate=8.84MB mem-reservation=5.75MB
thread-reservation=1 runtime-filters-memory=1.00MB
+| JOIN BUILD
+| | join-table-id=03 plan-id=04 cohort-id=01
+| | build expressions: c_customer_sk
+| | runtime filters: RF004[bloom] <- c_customer_sk, RF005[min_max] <-
c_customer_sk
+| | mem-estimate=4.75MB mem-reservation=4.75MB spill-buffer=256.00KB
thread-reservation=0
+| |
+| 19:EXCHANGE [HASH(c_customer_sk)]
+| | mem-estimate=3.09MB mem-reservation=0B thread-reservation=0
+| | tuple-ids=5 row-size=32B cardinality=100.00K
+| | in pipelines: 05(GETNEXT)
+| |
+| F03:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+| Per-Instance Resources: mem-estimate=16.42MB mem-reservation=2.00MB
thread-reservation=1
+| 05:SCAN HDFS [tpcds_partitioned_parquet_snap.customer, RANDOM]
+| HDFS partitions=1/1 files=1 size=5.49MB
+| stored statistics:
+| table: rows=100.00K size=5.49MB
+| columns: all
+| extrapolated-rows=disabled max-scan-range-rows=100.00K
+| mem-estimate=16.00MB mem-reservation=2.00MB thread-reservation=0
+| tuple-ids=5 row-size=32B cardinality=100.00K
+| in pipelines: 05(GETNEXT)
+|
+18:EXCHANGE [HASH(sr_customer_sk)]
+| mem-estimate=502.09KB mem-reservation=0B thread-reservation=0
+| tuple-ids=2 row-size=24B cardinality=53.52K
+| in pipelines: 17(GETNEXT)
+|
+F02:PLAN FRAGMENT [HASH(sr_customer_sk,sr_store_sk)] hosts=3 instances=3
+Per-Instance Resources: mem-estimate=10.49MB mem-reservation=1.94MB
thread-reservation=1
+17:AGGREGATE [FINALIZE]
+| output: sum:merge(SR_RETURN_AMT)
+| group by: sr_customer_sk, sr_store_sk
+| mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB
thread-reservation=0
+| tuple-ids=2 row-size=24B cardinality=53.52K
+| in pipelines: 17(GETNEXT), 00(OPEN)
+|
+16:EXCHANGE [HASH(sr_customer_sk,sr_store_sk)]
+| mem-estimate=502.09KB mem-reservation=0B thread-reservation=0
+| tuple-ids=2 row-size=24B cardinality=53.52K
+| in pipelines: 00(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Shared Resources: mem-estimate=4.00MB mem-reservation=4.00MB
thread-reservation=0 runtime-filters-memory=4.00MB
+Per-Instance Resources: mem-estimate=26.33MB mem-reservation=2.12MB
thread-reservation=1
+03:AGGREGATE [STREAMING]
+| output: sum(SR_RETURN_AMT)
+| group by: sr_customer_sk, sr_store_sk
+| mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB
thread-reservation=0
+| tuple-ids=2 row-size=24B cardinality=53.52K
+| in pipelines: 00(GETNEXT)
+|
+02:HASH JOIN [INNER JOIN, BROADCAST]
+| hash-table-id=04
+| hash predicates: sr_returned_date_sk = d_date_sk
+| fk/pk conjuncts: sr_returned_date_sk = d_date_sk
+| mem-estimate=0B mem-reservation=0B spill-buffer=64.00KB thread-reservation=0
+| tuple-ids=0,1 row-size=24B cardinality=53.52K
+| in pipelines: 00(GETNEXT), 01(OPEN)
+|
+|--F15:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+| | Per-Instance Resources: mem-estimate=2.95MB mem-reservation=2.94MB
thread-reservation=1 runtime-filters-memory=1.00MB
+| JOIN BUILD
+| | join-table-id=04 plan-id=05 cohort-id=01
+| | build expressions: d_date_sk
+| | runtime filters: RF006[bloom] <- d_date_sk
+| | mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB
thread-reservation=0
+| |
+| 15:EXCHANGE [BROADCAST]
+| | mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+| | tuple-ids=1 row-size=8B cardinality=373
+| | in pipelines: 01(GETNEXT)
+| |
+| F01:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+| Per-Instance Resources: mem-estimate=16.05MB mem-reservation=512.00KB
thread-reservation=1
+| 01:SCAN HDFS [tpcds_partitioned_parquet_snap.date_dim, RANDOM]
+| HDFS partitions=1/1 files=1 size=2.17MB
+| predicates: d_year = CAST(2000 AS INT)
+| stored statistics:
+| table: rows=73.05K size=2.17MB
+| columns: all
+| extrapolated-rows=disabled max-scan-range-rows=73.05K
+| parquet statistics predicates: d_year = CAST(2000 AS INT)
+| parquet dictionary predicates: d_year = CAST(2000 AS INT)
+| mem-estimate=16.00MB mem-reservation=512.00KB thread-reservation=0
+| tuple-ids=1 row-size=8B cardinality=373
+| in pipelines: 01(GETNEXT)
+|
+00:SCAN HDFS [tpcds_partitioned_parquet_snap.store_returns, RANDOM]
+ HDFS partitions=2004/2004 files=2004 size=33.63MB
+ runtime filters: RF001[min_max] ->
tpcds_partitioned_parquet_snap.store_returns.sr_store_sk, RF003[min_max] ->
tpcds_partitioned_parquet_snap.store_returns.sr_store_sk, RF005[min_max] ->
tpcds_partitioned_parquet_snap.store_returns.sr_customer_sk, RF000[bloom] ->
tpcds_partitioned_parquet_snap.store_returns.sr_store_sk, RF002[bloom] ->
tpcds_partitioned_parquet_snap.store_returns.sr_store_sk, RF004[bloom] ->
tpcds_partitioned_parquet_snap.store_returns.sr_customer_sk, RF006[bloom] ->
[...]
+ stored statistics:
+ table: rows=287.51K size=33.63MB
+ partitions: 2004/2004 rows=287.51K
+ columns: all
+ extrapolated-rows=disabled max-scan-range-rows=10.01K
est-scan-range=373(filtered from 2004)
+ mem-estimate=16.00MB mem-reservation=128.00KB thread-reservation=0
+ tuple-ids=0 row-size=16B cardinality=53.52K(filtered from 287.51K)
+ in pipelines: 00(GETNEXT)
+====
diff --git
a/testdata/workloads/functional-query/queries/QueryTest/processing-cost-admission-slots.test
b/testdata/workloads/functional-query/queries/QueryTest/processing-cost-admission-slots.test
new file mode 100644
index 000000000..118df2588
--- /dev/null
+++
b/testdata/workloads/functional-query/queries/QueryTest/processing-cost-admission-slots.test
@@ -0,0 +1,110 @@
+====
+---- QUERY: TPCDS-Q1-CPC-PLANNER-CPU-ASK
+-- This EE tests should match test cases in
+--
functional-planner/queries/PlannerTest/processing-cost-plan-admission-slots.test
+-- Any modification here should be applied there as well.
+--
+-- Expect a total of 16 admission slots given to this query if using
PLANNER_CPU_ASK strategy.
+-- TODO: Figure out how to enable QUERYOPTIONS section for EE test. Currently,
it is only
+-- work for FE Planner tests via TestFileParser.java.
+SET COMPUTE_PROCESSING_COST=True;
+SET SLOT_COUNT_STRATEGY=PLANNER_CPU_ASK;
+with customer_total_return as (
+ select sr_customer_sk as ctr_customer_sk,
+ sr_store_sk as ctr_store_sk,
+ sum(SR_RETURN_AMT) as ctr_total_return
+ from tpcds_partitioned_parquet_snap.store_returns,
+ tpcds_partitioned_parquet_snap.date_dim
+ where sr_returned_date_sk = d_date_sk
+ and d_year = 2000
+ group by sr_customer_sk, sr_store_sk
+) select c_customer_id
+from customer_total_return ctr1,
+ tpcds_partitioned_parquet_snap.store,
+ tpcds_partitioned_parquet_snap.customer
+where ctr1.ctr_total_return > (
+ select avg(ctr_total_return) * 1.2
+ from customer_total_return ctr2
+ where ctr1.ctr_store_sk = ctr2.ctr_store_sk)
+and s_store_sk = ctr1.ctr_store_sk
+and s_state = 'TN'
+and ctr1.ctr_customer_sk = c_customer_sk
+order by c_customer_id
+limit 100;
+---- RUNTIME_PROFILE
+aggregation(SUM, AdmissionSlots): 16
+row_regex: .* AvgAdmissionSlotsPerExecutor: 6 .*
+row_regex: .* Executor group 2 \(large\):.*
+row_regex: .* CpuAsk: 16 .*
+row_regex: .* EffectiveParallelism: 16 .*
+====
+---- QUERY: TPCDS-Q1-CPC-LARGEST-FRAGMENT
+-- Expect a total of 3 admission slots given to this query if using
LARGEST_FRAGMENT strategy.
+-- AvgAdmissionSlotsPerExecutor counter should not exist in query profile.
+SET COMPUTE_PROCESSING_COST=True;
+SET SLOT_COUNT_STRATEGY=LARGEST_FRAGMENT;
+with customer_total_return as (
+ select sr_customer_sk as ctr_customer_sk,
+ sr_store_sk as ctr_store_sk,
+ sum(SR_RETURN_AMT) as ctr_total_return
+ from tpcds_partitioned_parquet_snap.store_returns,
+ tpcds_partitioned_parquet_snap.date_dim
+ where sr_returned_date_sk = d_date_sk
+ and d_year = 2000
+ group by sr_customer_sk, sr_store_sk
+) select c_customer_id
+from customer_total_return ctr1,
+ tpcds_partitioned_parquet_snap.store,
+ tpcds_partitioned_parquet_snap.customer
+where ctr1.ctr_total_return > (
+ select avg(ctr_total_return) * 1.2
+ from customer_total_return ctr2
+ where ctr1.ctr_store_sk = ctr2.ctr_store_sk)
+and s_store_sk = ctr1.ctr_store_sk
+and s_state = 'TN'
+and ctr1.ctr_customer_sk = c_customer_sk
+order by c_customer_id
+limit 100;
+---- RUNTIME_PROFILE
+aggregation(SUM, AdmissionSlots): 3
+!row_regex: .* AvgAdmissionSlotsPerExecutor: .*
+row_regex: .* Executor group 2 \(large\):.*
+row_regex: .* CpuAsk: 16 .*
+row_regex: .* EffectiveParallelism: 16 .*
+====
+---- QUERY: TPCDS-Q1-NO-CPC
+-- Expect a total of 3 admission slots given to this query if
COMPUTE_PROCESSING_COST is disabled.
+-- Set MT_DOP=1 to keep query compiled in multi thread parallelism mode.
+-- Counters related to COMPUTE_PROCESSING_COST options should not exist in
profile.
+-- It should still go to large executor group due to min memory requirement.
+SET COMPUTE_PROCESSING_COST=False;
+SET MT_DOP=1;
+with customer_total_return as (
+ select sr_customer_sk as ctr_customer_sk,
+ sr_store_sk as ctr_store_sk,
+ sum(SR_RETURN_AMT) as ctr_total_return
+ from tpcds_partitioned_parquet_snap.store_returns,
+ tpcds_partitioned_parquet_snap.date_dim
+ where sr_returned_date_sk = d_date_sk
+ and d_year = 2000
+ group by sr_customer_sk, sr_store_sk
+) select c_customer_id
+from customer_total_return ctr1,
+ tpcds_partitioned_parquet_snap.store,
+ tpcds_partitioned_parquet_snap.customer
+where ctr1.ctr_total_return > (
+ select avg(ctr_total_return) * 1.2
+ from customer_total_return ctr2
+ where ctr1.ctr_store_sk = ctr2.ctr_store_sk)
+and s_store_sk = ctr1.ctr_store_sk
+and s_state = 'TN'
+and ctr1.ctr_customer_sk = c_customer_sk
+order by c_customer_id
+limit 100;
+---- RUNTIME_PROFILE
+aggregation(SUM, AdmissionSlots): 3
+!row_regex: .* AvgAdmissionSlotsPerExecutor: .*
+row_regex: .* Executor group 2 \(large\):.*
+!row_regex: .* CpuAsk: .*
+!row_regex: .* EffectiveParallelism: .*
+====
diff --git a/tests/custom_cluster/test_executor_groups.py
b/tests/custom_cluster/test_executor_groups.py
index e7a9e382c..ce12e0322 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -42,6 +42,32 @@ CPU_TEST_QUERY = "select * from tpcds_parquet.store_sales
where ss_item_sk = 1 l
GROUPING_TEST_QUERY = ("select ss_item_sk from tpcds_parquet.store_sales"
" group by (ss_item_sk) order by ss_item_sk limit 10")
+# TPC-DS Q1 to test slightly more complex query.
+TPCDS_Q1 = """
+with customer_total_return as (
+ select sr_customer_sk as ctr_customer_sk,
+ sr_store_sk as ctr_store_sk,
+ sum(SR_RETURN_AMT) as ctr_total_return
+ from tpcds_partitioned_parquet_snap.store_returns,
+ tpcds_partitioned_parquet_snap.date_dim
+ where sr_returned_date_sk = d_date_sk
+ and d_year = 2000
+ group by sr_customer_sk, sr_store_sk
+) select c_customer_id
+from customer_total_return ctr1,
+ tpcds_partitioned_parquet_snap.store,
+ tpcds_partitioned_parquet_snap.customer
+where ctr1.ctr_total_return > (
+ select avg(ctr_total_return) * 1.2
+ from customer_total_return ctr2
+ where ctr1.ctr_store_sk = ctr2.ctr_store_sk)
+and s_store_sk = ctr1.ctr_store_sk
+and s_state = 'TN'
+and ctr1.ctr_customer_sk = c_customer_sk
+order by c_customer_id
+limit 100
+"""
+
DEFAULT_RESOURCE_POOL = "default-pool"
@@ -851,14 +877,14 @@ class TestExecutorGroups(CustomClusterTestSuite):
# Create fresh client
self.create_impala_clients()
- # Add an exec group with 8 admission slots and 1 executors.
- self._add_executor_group("group", 1, admission_control_slots=8,
+ # Add an exec group with 4 admission slots and 1 executors.
+ self._add_executor_group("group", 1, admission_control_slots=4,
resource_pool="root.tiny",
extra_args="-mem_limit=2g")
# Add an exec group with 8 admission slots and 2 executors.
self._add_executor_group("group", 2, admission_control_slots=8,
resource_pool="root.small",
extra_args="-mem_limit=2g")
- # Add another exec group with 8 admission slots and 3 executors.
- self._add_executor_group("group", 3, admission_control_slots=8,
+ # Add another exec group with 64 admission slots and 3 executors.
+ self._add_executor_group("group", 3, admission_control_slots=64,
resource_pool="root.large",
extra_args="-mem_limit=2g")
assert self._get_num_executor_groups(only_healthy=True) == 3
assert self._get_num_executor_groups(only_healthy=True,
@@ -909,32 +935,38 @@ class TestExecutorGroups(CustomClusterTestSuite):
self._setup_three_exec_group_cluster(coordinator_test_args)
self.client.clear_configuration()
+ # The default query options for this test.
+ # Some test case will change these options along the test, but should
eventually
+ # restored to this default values.
+ self._set_query_options({
+ 'COMPUTE_PROCESSING_COST': 'true',
+ 'SLOT_COUNT_STRATEGY': 'PLANNER_CPU_ASK'})
+
# Expect to run the query on the small group by default.
- self._set_query_options({'COMPUTE_PROCESSING_COST': 'true'})
self._run_query_and_verify_profile(CPU_TEST_QUERY,
- ["Executor Group: root.small-group", "EffectiveParallelism: 11",
- "ExecutorGroupsConsidered: 2"])
+ ["Executor Group: root.small-group", "EffectiveParallelism: 10",
+ "ExecutorGroupsConsidered: 2", "AvgAdmissionSlotsPerExecutor: 5"])
# Test disabling COMPUTE_PROCESING_COST. This will produce non-MT plan.
self._set_query_options({'COMPUTE_PROCESSING_COST': 'false'})
self._run_query_and_verify_profile(CPU_TEST_QUERY,
["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
"Verdict: Match"],
- ["EffectiveParallelism:", "CpuAsk:"])
+ ["EffectiveParallelism:", "CpuAsk:", "AvgAdmissionSlotsPerExecutor:"])
# Test COMPUTE_PROCESING_COST=false and MT_DOP=2.
self._set_query_options({'MT_DOP': '2'})
self._run_query_and_verify_profile(CPU_TEST_QUERY,
["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1",
"Verdict: Match"],
- ["EffectiveParallelism:", "CpuAsk:"])
+ ["EffectiveParallelism:", "CpuAsk:", "AvgAdmissionSlotsPerExecutor:"])
# Test COMPUTE_PROCESING_COST=true and MT_DOP=2.
# COMPUTE_PROCESING_COST should override MT_DOP.
self._set_query_options({'COMPUTE_PROCESSING_COST': 'true'})
self._run_query_and_verify_profile(CPU_TEST_QUERY,
- ["Executor Group: root.small-group", "EffectiveParallelism: 11",
- "ExecutorGroupsConsidered: 2"])
+ ["Executor Group: root.small-group", "EffectiveParallelism: 10",
+ "ExecutorGroupsConsidered: 2", "AvgAdmissionSlotsPerExecutor: 5"])
# Unset MT_DOP
self._set_query_options({'MT_DOP': '0'})
@@ -947,7 +979,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
"from tpcds_parquet.store_sales where ss_sold_date_sk < 2452184"
).format(unique_database, "store_sales_subset"),
["Executor Group: root.small", "ExecutorGroupsConsidered: 2",
- "Verdict: Match", "CpuAsk: 10"])
+ "Verdict: Match", "CpuAsk: 10", "AvgAdmissionSlotsPerExecutor: 5"])
compute_stats_query = ("compute stats {0}.{1}").format(
unique_database, "store_sales_subset")
@@ -961,6 +993,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
["ExecutorGroupsConsidered: 1",
"Verdict: Assign to first group because query is not auto-scalable"],
["Query Options (set by configuration): REQUEST_POOL=",
+ "EffectiveParallelism:", "CpuAsk:", "AvgAdmissionSlotsPerExecutor:",
"Executor Group:"])
self._verify_total_admitted_queries("root.small", 4)
self._verify_total_admitted_queries("root.large", 2)
@@ -972,7 +1005,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
["Query Options (set by configuration): REQUEST_POOL=root.small",
"ExecutorGroupsConsidered: 1",
"Verdict: Assign to first group because query is not auto-scalable"],
- ["Executor Group:"])
+ ["EffectiveParallelism:", "CpuAsk:", "AvgAdmissionSlotsPerExecutor:"])
self._verify_total_admitted_queries("root.small", 6)
self.client.clear_configuration()
@@ -983,7 +1016,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
["Query Options (set by configuration): REQUEST_POOL=root.large",
"ExecutorGroupsConsidered: 1",
"Verdict: Assign to first group because query is not auto-scalable"],
- ["Executor Group:"])
+ ["EffectiveParallelism:", "CpuAsk:", "AvgAdmissionSlotsPerExecutor:"])
self._verify_total_admitted_queries("root.large", 4)
# Test that REQUEST_POOL will override executor group selection
@@ -992,7 +1025,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
"Executor Group: root.large-group",
("Verdict: query option REQUEST_POOL=root.large is set. "
"Memory and cpu limit checking is skipped."),
- "EffectiveParallelism: 13", "ExecutorGroupsConsidered: 1"])
+ "EffectiveParallelism: 12", "ExecutorGroupsConsidered: 1",
+ "AvgAdmissionSlotsPerExecutor: 4"])
# Test setting REQUEST_POOL=root.large and disabling
COMPUTE_PROCESSING_COST
self._set_query_options({'COMPUTE_PROCESSING_COST': 'false'})
@@ -1002,7 +1036,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
("Verdict: query option REQUEST_POOL=root.large is set. "
"Memory and cpu limit checking is skipped."),
"ExecutorGroupsConsidered: 1"],
- ["EffectiveParallelism:", "CpuAsk:"])
+ ["EffectiveParallelism:", "CpuAsk:", "AvgAdmissionSlotsPerExecutor:"])
# Unset REQUEST_POOL and restore COMPUTE_PROCESSING_COST.
self._set_query_options({
@@ -1010,23 +1044,22 @@ class TestExecutorGroups(CustomClusterTestSuite):
'COMPUTE_PROCESSING_COST': 'true'})
# Test that empty REQUEST_POOL should have no impact.
- self.client.set_configuration({'REQUEST_POOL': ''})
self._run_query_and_verify_profile(CPU_TEST_QUERY,
- ["Executor Group: root.small-group", "ExecutorGroupsConsidered: 2",
- "Verdict: Match"],
+ ["Executor Group: root.small-group", "EffectiveParallelism: 10",
+ "ExecutorGroupsConsidered: 2", "AvgAdmissionSlotsPerExecutor: 5"],
["Query Options (set by configuration): REQUEST_POOL="])
- self.client.clear_configuration()
# Test that GROUPING_TEST_QUERY will get assigned to the large group.
self._run_query_and_verify_profile(GROUPING_TEST_QUERY,
["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
- "Verdict: Match", "CpuAsk: 12"])
+ "Verdict: Match", "CpuAsk: 12", "AvgAdmissionSlotsPerExecutor: 4"])
# ENABLE_REPLAN=false should force query to run in first group (tiny).
self._set_query_options({'ENABLE_REPLAN': 'false'})
self._run_query_and_verify_profile(TEST_QUERY,
["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1",
- "Verdict: Assign to first group because query option
ENABLE_REPLAN=false"])
+ "Verdict: Assign to first group because query option
ENABLE_REPLAN=false"],
+ ["EffectiveParallelism:", "CpuAsk:", "AvgAdmissionSlotsPerExecutor:"])
# Unset ENABLE_REPLAN.
self._set_query_options({'ENABLE_REPLAN': ''})
@@ -1035,37 +1068,38 @@ class TestExecutorGroups(CustomClusterTestSuite):
self._run_query_and_verify_profile("SELECT 1",
["Executor Group: empty group (using coordinator only)",
"ExecutorGroupsConsidered: 1",
- "Verdict: Assign to first group because the number of nodes is 1"])
+ "Verdict: Assign to first group because the number of nodes is 1"],
+ ["EffectiveParallelism:", "CpuAsk:", "AvgAdmissionSlotsPerExecutor:"])
# CREATE/DROP database should work and assigned to tiny group.
self._run_query_and_verify_profile(
"CREATE DATABASE test_non_scalable_query;",
["ExecutorGroupsConsidered: 1",
"Verdict: Assign to first group because query is not auto-scalable"],
- ["Executor Group:"])
+ ["EffectiveParallelism:", "CpuAsk:", "AvgAdmissionSlotsPerExecutor:"])
self._run_query_and_verify_profile(
"DROP DATABASE test_non_scalable_query;",
["ExecutorGroupsConsidered: 1",
"Verdict: Assign to first group because query is not auto-scalable"],
- ["Executor Group:"])
+ ["EffectiveParallelism:", "CpuAsk:", "AvgAdmissionSlotsPerExecutor:"])
# Test combination of PROCESSING_COST_MIN_THREADS and
MAX_FRAGMENT_INSTANCES_PER_NODE.
self._set_query_options({'MAX_FRAGMENT_INSTANCES_PER_NODE': '3'})
self._run_query_and_verify_profile(GROUPING_TEST_QUERY,
["Executor Group: root.large-group", "EffectiveParallelism: 9",
- "ExecutorGroupsConsidered: 3"])
+ "ExecutorGroupsConsidered: 3", "AvgAdmissionSlotsPerExecutor: 3"])
self._set_query_options({'MAX_FRAGMENT_INSTANCES_PER_NODE': '4'})
self._run_query_and_verify_profile(GROUPING_TEST_QUERY,
["Executor Group: root.large-group", "EffectiveParallelism: 12",
- "ExecutorGroupsConsidered: 3"])
+ "ExecutorGroupsConsidered: 3", "AvgAdmissionSlotsPerExecutor: 4"])
self._set_query_options({'PROCESSING_COST_MIN_THREADS': '2'})
self._run_query_and_verify_profile(GROUPING_TEST_QUERY,
["Executor Group: root.large-group", "EffectiveParallelism: 12",
- "ExecutorGroupsConsidered: 3"])
+ "ExecutorGroupsConsidered: 3", "AvgAdmissionSlotsPerExecutor: 4"])
self._set_query_options({'MAX_FRAGMENT_INSTANCES_PER_NODE': '2'})
self._run_query_and_verify_profile(GROUPING_TEST_QUERY,
["Executor Group: root.small-group", "EffectiveParallelism: 4",
- "ExecutorGroupsConsidered: 2"])
+ "ExecutorGroupsConsidered: 2", "AvgAdmissionSlotsPerExecutor: 2"])
self._set_query_options({'MAX_FRAGMENT_INSTANCES_PER_NODE': '1'})
result = self.execute_query_expect_failure(self.client, CPU_TEST_QUERY)
status = (r"PROCESSING_COST_MIN_THREADS \(2\) can not be larger than "
@@ -1081,13 +1115,13 @@ class TestExecutorGroups(CustomClusterTestSuite):
self._run_query_and_verify_profile(
"SELECT count(*) FROM tpcds_parquet.store_sales",
["Executor Group: root.small-group", "EffectiveParallelism: 10",
- "ExecutorGroupsConsidered: 2"])
+ "ExecutorGroupsConsidered: 2", "AvgAdmissionSlotsPerExecutor: 5"])
# Test optimized count star query with 383 scan ranges assign to tiny
group.
self._run_query_and_verify_profile(
"SELECT count(*) FROM tpcds_parquet.store_sales WHERE ss_sold_date_sk <
2451200",
["Executor Group: root.tiny-group", "EffectiveParallelism: 2",
- "ExecutorGroupsConsidered: 1"])
+ "ExecutorGroupsConsidered: 1", "AvgAdmissionSlotsPerExecutor: 2"])
# Test optimized count star query with 1 scan range detected as trivial
query
# and assign to tiny group.
@@ -1095,20 +1129,21 @@ class TestExecutorGroups(CustomClusterTestSuite):
"SELECT count(*) FROM tpcds_parquet.date_dim",
["Executor Group: empty group (using coordinator only)",
"ExecutorGroupsConsidered: 1",
- "Verdict: Assign to first group because the number of nodes is 1"])
+ "Verdict: Assign to first group because the number of nodes is 1"],
+ ["EffectiveParallelism:", "CpuAsk:", "AvgAdmissionSlotsPerExecutor:"])
# Test unoptimized count star query assign to small group.
self._run_query_and_verify_profile(
("SELECT count(*) FROM tpcds_parquet.store_sales "
"WHERE ss_ext_discount_amt != 0.3857"),
["Executor Group: root.small-group", "EffectiveParallelism: 10",
- "ExecutorGroupsConsidered: 2"])
+ "ExecutorGroupsConsidered: 2", "AvgAdmissionSlotsPerExecutor: 5"])
# Test zero slot scan query assign to small group.
self._run_query_and_verify_profile(
"SELECT count(ss_sold_date_sk) FROM tpcds_parquet.store_sales",
["Executor Group: root.small-group", "EffectiveParallelism: 10",
- "ExecutorGroupsConsidered: 2"])
+ "ExecutorGroupsConsidered: 2", "AvgAdmissionSlotsPerExecutor: 5"])
# END testing count queries
# BEGIN testing insert + MAX_FS_WRITER
@@ -1119,7 +1154,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
"select id, year from functional_parquet.alltypes"
).format(unique_database, "test_ctas1"),
["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1",
- "Verdict: Match", "CpuAsk: 1"])
+ "Verdict: Match", "CpuAsk: 1", "AvgAdmissionSlotsPerExecutor: 1"])
self.__verify_fs_writers(result, 1, [0, 1])
# Test unpartitioned insert, small scan, no MAX_FS_WRITER, with limit.
@@ -1129,7 +1164,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
"select id, year from functional_parquet.alltypes limit 100000"
).format(unique_database, "test_ctas2"),
["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1",
- "Verdict: Match", "CpuAsk: 2"])
+ "Verdict: Match", "CpuAsk: 2", "AvgAdmissionSlotsPerExecutor: 2"])
self.__verify_fs_writers(result, 1, [0, 2])
# Test partitioned insert, small scan, no MAX_FS_WRITER.
@@ -1139,7 +1174,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
"select id, year from functional_parquet.alltypes"
).format(unique_database, "test_ctas3"),
["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1",
- "Verdict: Match", "CpuAsk: 1"])
+ "Verdict: Match", "CpuAsk: 1", "AvgAdmissionSlotsPerExecutor: 1"])
self.__verify_fs_writers(result, 1, [0, 1])
# Test unpartitioned insert, large scan, no MAX_FS_WRITER.
@@ -1148,7 +1183,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
"select ss_item_sk, ss_ticket_number, ss_store_sk "
"from tpcds_parquet.store_sales").format(unique_database, "test_ctas4"),
["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
- "Verdict: Match", "CpuAsk: 13"])
+ "Verdict: Match", "CpuAsk: 13", "AvgAdmissionSlotsPerExecutor: 5"])
self.__verify_fs_writers(result, 1, [0, 4, 4, 5])
# Test partitioned insert, large scan, no MAX_FS_WRITER.
@@ -1157,7 +1192,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
"select ss_item_sk, ss_ticket_number, ss_store_sk "
"from tpcds_parquet.store_sales").format(unique_database, "test_ctas5"),
["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
- "Verdict: Match", "CpuAsk: 15"])
+ "Verdict: Match", "CpuAsk: 15", "AvgAdmissionSlotsPerExecutor: 5"])
self.__verify_fs_writers(result, 3, [0, 5, 5, 5])
# Test partitioned insert, large scan, high MAX_FS_WRITER.
@@ -1167,7 +1202,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
"select ss_item_sk, ss_ticket_number, ss_store_sk "
"from tpcds_parquet.store_sales").format(unique_database, "test_ctas6"),
["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
- "Verdict: Match", "CpuAsk: 15"])
+ "Verdict: Match", "CpuAsk: 15", "AvgAdmissionSlotsPerExecutor: 5"])
self.__verify_fs_writers(result, 3, [0, 5, 5, 5])
# Test partitioned insert, large scan, low MAX_FS_WRITER.
@@ -1177,7 +1212,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
"select ss_item_sk, ss_ticket_number, ss_store_sk "
"from tpcds_parquet.store_sales").format(unique_database, "test_ctas7"),
["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
- "Verdict: Match", "CpuAsk: 14"])
+ "Verdict: Match", "CpuAsk: 14", "AvgAdmissionSlotsPerExecutor: 5"])
self.__verify_fs_writers(result, 2, [0, 4, 5, 5])
# Test that non-CTAS unpartitioned insert works. MAX_FS_WRITER=2.
@@ -1186,7 +1221,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
"select ss_item_sk, ss_ticket_number, ss_store_sk "
"from tpcds_parquet.store_sales").format(unique_database, "test_ctas4"),
["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
- "Verdict: Match", "CpuAsk: 13"])
+ "Verdict: Match", "CpuAsk: 13", "AvgAdmissionSlotsPerExecutor: 5"])
self.__verify_fs_writers(result, 1, [0, 4, 4, 5])
# Test that non-CTAS partitioned insert works. MAX_FS_WRITER=2.
@@ -1196,20 +1231,48 @@ class TestExecutorGroups(CustomClusterTestSuite):
"select ss_item_sk, ss_ticket_number, ss_store_sk "
"from tpcds_parquet.store_sales").format(unique_database, "test_ctas7"),
["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
- "Verdict: Match", "CpuAsk: 14"])
+ "Verdict: Match", "CpuAsk: 14", "AvgAdmissionSlotsPerExecutor: 5"])
self.__verify_fs_writers(result, 2, [0, 4, 5, 5])
# Unset MAX_FS_WRITERS.
self._set_query_options({'MAX_FS_WRITERS': ''})
# END testing insert + MAX_FS_WRITER
+ # BEGIN test slot count strategy
+ # Unset SLOT_COUNT_STRATEGY to use default strategy, which is max # of
instances
+ # of any fragment on that backend.
+ # TPCDS_Q1 at root.large_group will have following CoreCount trace:
+ # CoreCount={total=16
trace=F15:3+F01:1+F14:3+F03:1+F13:3+F05:1+F12:3+F07:1},
+ # coresRequired=16
+ self._set_query_options({'SLOT_COUNT_STRATEGY': ''})
+ result = self._run_query_and_verify_profile(TPCDS_Q1,
+ ["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
+ "Verdict: Match", "CpuAsk: 16",
+ "AdmissionSlots: 1" # coordinator and executors all have 1 slot
+ ],
+ ["AvgAdmissionSlotsPerExecutor:", "AdmissionSlots: 6"])
+
+ # Test with SLOT_COUNT_STRATEGY='PLANNER_CPU_ASK'.
+ self._set_query_options({'SLOT_COUNT_STRATEGY': 'PLANNER_CPU_ASK'})
+ result = self._run_query_and_verify_profile(TPCDS_Q1,
+ ["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
+ "Verdict: Match", "CpuAsk: 16", "AvgAdmissionSlotsPerExecutor: 6",
+ # coordinator has 1 slot
+ "AdmissionSlots: 1",
+ # 1 executor has F15:1+F01:1+F14:1+F03:1+F13:1+F05:1+F12:1+F07:1 = 8
slots
+ "AdmissionSlots: 8",
+ # 2 executors have F15:1+F14:1+F13:1+F12:1 = 4 slots
+ "AdmissionSlots: 4"
+ ])
+ # END test slot count strategy
+
# Check resource pools on the Web queries site and admission site
self._verify_query_num_for_resource_pool("root.small", 10)
self._verify_query_num_for_resource_pool("root.tiny", 5)
- self._verify_query_num_for_resource_pool("root.large", 12)
+ self._verify_query_num_for_resource_pool("root.large", 14)
self._verify_total_admitted_queries("root.small", 11)
self._verify_total_admitted_queries("root.tiny", 8)
- self._verify_total_admitted_queries("root.large", 16)
+ self._verify_total_admitted_queries("root.large", 18)
@pytest.mark.execute_serially
def test_query_cpu_count_divisor_two(self):
@@ -1217,29 +1280,35 @@ class TestExecutorGroups(CustomClusterTestSuite):
# But the CpuAsk is around half of EffectiveParallelism.
coordinator_test_args = "-query_cpu_count_divisor=2 "
self._setup_three_exec_group_cluster(coordinator_test_args)
- self._set_query_options({'COMPUTE_PROCESSING_COST': 'true'})
+ self._set_query_options({
+ 'COMPUTE_PROCESSING_COST': 'true',
+ 'SLOT_COUNT_STRATEGY': 'PLANNER_CPU_ASK'})
self._run_query_and_verify_profile(CPU_TEST_QUERY,
["Executor Group: root.small-group",
- "CpuAsk: 6", "EffectiveParallelism: 11",
- "CpuCountDivisor: 2", "ExecutorGroupsConsidered: 2"])
+ "CpuAsk: 5", "EffectiveParallelism: 10",
+ "CpuCountDivisor: 2", "ExecutorGroupsConsidered: 2",
+ "AvgAdmissionSlotsPerExecutor: 5"])
# Test that QUERY_CPU_COUNT_DIVISOR option can override
# query_cpu_count_divisor flag.
self._set_query_options({'QUERY_CPU_COUNT_DIVISOR': '1.0'})
self._run_query_and_verify_profile(CPU_TEST_QUERY,
["Executor Group: root.small-group",
- "CpuAsk: 11", "EffectiveParallelism: 11",
- "CpuCountDivisor: 1", "ExecutorGroupsConsidered: 2"])
+ "CpuAsk: 10", "EffectiveParallelism: 10",
+ "CpuCountDivisor: 1", "ExecutorGroupsConsidered: 2",
+ "AvgAdmissionSlotsPerExecutor: 5"])
self._set_query_options({'QUERY_CPU_COUNT_DIVISOR': '0.5'})
self._run_query_and_verify_profile(CPU_TEST_QUERY,
["Executor Group: root.large-group",
- "CpuAsk: 22", "EffectiveParallelism: 11",
- "CpuCountDivisor: 0.5", "ExecutorGroupsConsidered: 3"])
+ "CpuAsk: 24", "EffectiveParallelism: 10",
+ "CpuCountDivisor: 0.5", "ExecutorGroupsConsidered: 3",
+ "AvgAdmissionSlotsPerExecutor: 4"])
self._set_query_options({'QUERY_CPU_COUNT_DIVISOR': '2.0'})
self._run_query_and_verify_profile(CPU_TEST_QUERY,
["Executor Group: root.small-group",
- "CpuAsk: 6", "EffectiveParallelism: 11",
- "CpuCountDivisor: 2", "ExecutorGroupsConsidered: 2"])
+ "CpuAsk: 5", "EffectiveParallelism: 10",
+ "CpuCountDivisor: 2", "ExecutorGroupsConsidered: 2",
+ "AvgAdmissionSlotsPerExecutor: 5"])
# Check resource pools on the Web queries site and admission site
self._verify_query_num_for_resource_pool("root.small", 3)
@@ -1255,11 +1324,12 @@ class TestExecutorGroups(CustomClusterTestSuite):
self._setup_three_exec_group_cluster(coordinator_test_args)
self._set_query_options({
'COMPUTE_PROCESSING_COST': 'true',
+ 'SLOT_COUNT_STRATEGY': 'PLANNER_CPU_ASK',
'MAX_FRAGMENT_INSTANCES_PER_NODE': '1'})
self._run_query_and_verify_profile(CPU_TEST_QUERY,
- ["Executor Group: root.large-group", "EffectiveParallelism: 4",
- "ExecutorGroupsConsidered: 3", "CpuAsk: 134",
- "Verdict: Match"])
+ ["Executor Group: root.large-group", "EffectiveParallelism: 3",
+ "ExecutorGroupsConsidered: 3", "CpuAsk: 100",
+ "Verdict: Match", "AvgAdmissionSlotsPerExecutor: 1"])
# Unset MAX_FRAGMENT_INSTANCES_PER_NODE.
self._set_query_options({'MAX_FRAGMENT_INSTANCES_PER_NODE': ''})
@@ -1267,9 +1337,10 @@ class TestExecutorGroups(CustomClusterTestSuite):
# Expect that a query still admitted to last group even if
# its resource requirement exceed the limit on that last executor group.
self._run_query_and_verify_profile(CPU_TEST_QUERY,
- ["Executor Group: root.large-group", "EffectiveParallelism: 16",
+ ["Executor Group: root.large-group", "EffectiveParallelism: 15",
"ExecutorGroupsConsidered: 3", "CpuAsk: 534",
- "Verdict: no executor group set fit. Admit to last executor group
set."])
+ "Verdict: no executor group set fit. Admit to last executor group
set.",
+ "AvgAdmissionSlotsPerExecutor: 5"])
# Check resource pools on the Web queries site and admission site
self._verify_query_num_for_resource_pool("root.large", 2)
@@ -1282,10 +1353,12 @@ class TestExecutorGroups(CustomClusterTestSuite):
coordinator_test_args = ("-query_cpu_count_divisor=0.03 "
"-skip_resource_checking_on_last_executor_group_set=false ")
self._setup_three_exec_group_cluster(coordinator_test_args)
- self._set_query_options({'COMPUTE_PROCESSING_COST': 'true'})
+ self._set_query_options({
+ 'COMPUTE_PROCESSING_COST': 'true',
+ 'SLOT_COUNT_STRATEGY': 'PLANNER_CPU_ASK'})
result = self.execute_query_expect_failure(self.client, CPU_TEST_QUERY)
assert ("AnalysisException: The query does not fit largest executor group
sets. "
- "Reason: not enough cpu cores (require=434, max=192).") in str(result)
+ "Reason: not enough cpu cores (require=400, max=192).") in str(result)
@pytest.mark.execute_serially
def test_min_processing_per_thread_small(self):
@@ -1294,24 +1367,29 @@ class TestExecutorGroups(CustomClusterTestSuite):
self._setup_three_exec_group_cluster(coordinator_test_args)
# Test that GROUPING_TEST_QUERY will get assigned to the large group.
- self._set_query_options({'COMPUTE_PROCESSING_COST': 'true'})
+ self._set_query_options({
+ 'COMPUTE_PROCESSING_COST': 'true',
+ 'SLOT_COUNT_STRATEGY': 'PLANNER_CPU_ASK'})
self._run_query_and_verify_profile(GROUPING_TEST_QUERY,
["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
- "Verdict: Match", "CpuAsk: 15"])
+ "Verdict: Match", "CpuAsk: 15",
+ "AvgAdmissionSlotsPerExecutor: 5"])
# Test that high_scan_cost_query will get assigned to the large group.
high_scan_cost_query = ("SELECT ss_item_sk FROM tpcds_parquet.store_sales "
"WHERE ss_item_sk < 1000000 GROUP BY ss_item_sk LIMIT 10")
self._run_query_and_verify_profile(high_scan_cost_query,
["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
- "Verdict: Match", "CpuAsk: 18"])
+ "Verdict: Match", "CpuAsk: 18",
+ "AvgAdmissionSlotsPerExecutor: 6"])
# Test that high_scan_cost_query will get assigned to the small group
# if MAX_FRAGMENT_INSTANCES_PER_NODE is limited to 1.
self._set_query_options({'MAX_FRAGMENT_INSTANCES_PER_NODE': '1'})
self._run_query_and_verify_profile(high_scan_cost_query,
["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1",
- "Verdict: Match", "CpuAsk: 1"])
+ "Verdict: Match", "CpuAsk: 1",
+ "AvgAdmissionSlotsPerExecutor: 1"])
# Check resource pools on the Web queries site and admission site
self._verify_query_num_for_resource_pool("root.tiny", 1)
diff --git a/tests/query_test/test_processing_cost.py
b/tests/query_test/test_processing_cost.py
new file mode 100644
index 000000000..d2e942a79
--- /dev/null
+++ b/tests/query_test/test_processing_cost.py
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Functional tests running the TPCH workload.
+from __future__ import absolute_import, division, print_function
+
+from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.test_dimensions import (
+ create_parquet_dimension,
+ create_single_exec_option_dimension
+)
+
+
+class TestProcessingCost(ImpalaTestSuite):
+ """Test processing cost in non-dedicated coordinator environment."""
+
+ @classmethod
+ def get_workload(self):
+ return 'functional-query'
+
+ @classmethod
+ def add_test_dimensions(cls):
+ super(TestProcessingCost, cls).add_test_dimensions()
+ cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
+
cls.ImpalaTestMatrix.add_dimension(create_parquet_dimension(cls.get_workload()))
+
+ def test_admission_slots(self, vector):
+ self.run_test_case('QueryTest/processing-cost-admission-slots', vector)
diff --git a/tests/query_test/test_tpcds_queries.py
b/tests/query_test/test_tpcds_queries.py
index c8496f35f..2dbb896e6 100644
--- a/tests/query_test/test_tpcds_queries.py
+++ b/tests/query_test/test_tpcds_queries.py
@@ -24,6 +24,7 @@ from copy import deepcopy
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.skip import SkipIfDockerizedCluster
from tests.common.test_dimensions import (
+ add_mandatory_exec_option,
create_single_exec_option_dimension,
is_supported_insert_format)
@@ -39,7 +40,7 @@ class TestTpcdsQuery(ImpalaTestSuite):
v.get_value('table_format').file_format not in ['rc', 'hbase', 'kudu']
and
v.get_value('table_format').compression_codec in ['none', 'snap'] and
v.get_value('table_format').compression_type != 'record')
- cls.ImpalaTestMatrix.add_mandatory_exec_option('decimal_v2', 0)
+ add_mandatory_exec_option(cls, 'decimal_v2', 0)
if cls.exploration_strategy() != 'exhaustive':
# Cut down on the execution time for these tests in core by running only
@@ -754,8 +755,9 @@ class TestTpcdsQueryWithProcessingCost(TestTpcdsQuery):
@classmethod
def add_test_dimensions(cls):
super(TestTpcdsQueryWithProcessingCost, cls).add_test_dimensions()
- cls.ImpalaTestMatrix.add_mandatory_exec_option('compute_processing_cost',
1)
-
cls.ImpalaTestMatrix.add_mandatory_exec_option('max_fragment_instances_per_node',
4)
+ add_mandatory_exec_option(cls, 'compute_processing_cost', 1)
+ add_mandatory_exec_option(cls, 'max_fragment_instances_per_node', 4)
+ add_mandatory_exec_option(cls, 'slot_count_strategy', 'planner_cpu_ask')
def test_tpcds_q51a(self, vector):
"""Reduce max_fragment_instances_per_node to lower memory requirement."""