This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit dbddb0844713677cd5165c55fe21ef46238d3e24 Author: Riza Suminto <[email protected]> AuthorDate: Wed May 10 15:59:15 2023 -0700 IMPALA-12120: Limit output writer parallelism based on write volume The new processing cost-based planner changes (IMPALA-11604, IMPALA-12091) will impact output writer parallelism for insert queries, with the potential for more small files if the processing cost-based planning results in too many writer fragments. It can further exacerbate a problem introduced by MT_DOP (see IMPALA-8125). The MAX_FS_WRITERS query option can help mitigate this. But even without the MAX_FS_WRITERS set, the default output writer parallelism should avoid creating excessive writer parallelism for partitioned and unpartitioned inserts. This patch implements such a limit when using the cost-based planner. It limits the number of writer fragments such that each writer fragment writes at least 256MB of rows. This patch also allows CTAS (a kind of DDL query) to be eligible for auto-scaling. This patch also remove comments about NUM_SCANNER_THREADS added by IMPALA-12029, since it does not applies anymore after IMPALA-12091. Testing: - Add test cases in test_query_cpu_count_divisor_default - Add test_processing_cost_writer_limit in test_insert.py - Pass test_insert.py::TestInsertHdfsWriterLimit - Pass test_executor_groups.py Change-Id: I289c6ffcd6d7b225179cc9fb2f926390325a27e0 Reviewed-on: http://gerrit.cloudera.org:8080/19880 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/scheduling/scheduler.cc | 6 +- common/thrift/ImpalaService.thrift | 6 - .../apache/impala/planner/DistributedPlanner.java | 112 +++++++++++++---- .../org/apache/impala/planner/HdfsTableSink.java | 5 + .../org/apache/impala/planner/PlanFragment.java | 12 +- .../java/org/apache/impala/planner/ScanNode.java | 10 ++ .../java/org/apache/impala/service/Frontend.java | 9 +- tests/custom_cluster/test_executor_groups.py | 136 +++++++++++++++++++-- tests/query_test/test_insert.py | 56 ++++++--- 9 files changed, 287 insertions(+), 65 deletions(-) diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc index 57a080815..64e8f9ce5 100644 --- a/be/src/scheduling/scheduler.cc +++ b/be/src/scheduling/scheduler.cc @@ -620,10 +620,10 @@ void Scheduler::CreateInputCollocatedInstances( int per_fragment_instance_idx = 0; int max_instances = input_fragment_state.instance_states.size(); - if (IsExceedMaxFsWriters(fragment_state, &input_fragment_state, state)) { - max_instances = state->query_options().max_fs_writers; - } else if (fragment.effective_instance_count > 0) { + if (fragment.effective_instance_count > 0) { max_instances = fragment.effective_instance_count; + } else if (IsExceedMaxFsWriters(fragment_state, &input_fragment_state, state)) { + max_instances = state->query_options().max_fs_writers; } if (max_instances != input_fragment_state.instance_states.size()) { diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift index 07d69ab89..e501d9cc2 100644 --- a/common/thrift/ImpalaService.thrift +++ b/common/thrift/ImpalaService.thrift @@ -74,12 +74,6 @@ enum TImpalaQueryOptions { MAX_IO_BUFFERS = 7 // Removed // Number of scanner threads. - // EXPERIMENTAL: if COMPUTE_PROCESSING_COST=true, this query option will be used to - // cap scan node cost to: - // (num_executor * NUM_SCANNER_THREADS * min_processing_per_thread) - // if the original scan cost exceed that value during the first round of planning. - // NUM_SCANNER_THREADS will be ignored once MT_DOP is restored in the second round of - // planning. NUM_SCANNER_THREADS = 8 ALLOW_UNSUPPORTED_FORMATS = 9 // Removed diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java index b6559f44b..a6845bb15 100644 --- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java @@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; import com.google.common.base.Predicates; import com.google.common.collect.Lists; +import com.google.common.math.IntMath; /** * The distributed planner is responsible for creating an executable, distributed plan @@ -171,31 +172,29 @@ public class DistributedPlanner { * or -1 if any of them doesn't have a distinct value estimate. */ private long getNumDistinctValues(List<Expr> exprs) { - long result = 1; - for (Expr expr: exprs) { - result *= expr.getNumDistinctValues(); - if (result < 0) return -1; - } - return result; + Preconditions.checkNotNull(exprs); + return exprs.isEmpty() ? 1 : Expr.getNumDistinctValues(exprs); } /** * Decides whether to repartition the output of 'inputFragment' before feeding * its data into the table sink of the given 'insertStmt'. The decision obeys - * the shuffle/noshuffle plan hints if present unless MAX_FS_WRITERS query - * option is used where the noshuffle hint is ignored. The decision is based on - * a number of factors including, whether the target table is partitioned or - * unpartitioned, the input fragment and the target table's partition - * expressions, expected number of output partitions, num of nodes on which the - * input partition will run, whether MAX_FS_WRITERS query option is used. If - * this functions ends up creating a new fragment, appends that to 'fragments'. + * the shuffle/noshuffle plan hints if present unless MAX_FS_WRITERS + * or COMPUTE_PROCESSING_COST query option is used where the noshuffle hint is + * ignored. The decision is based on a number of factors including, whether the target + * table is partitioned or unpartitioned, the input fragment and the target table's + * partition expressions, expected number of output partitions, num of nodes on which + * the input partition will run, whether MAX_FS_WRITERS or COMPUTE_PROCESSING_COST + * query option is used. If this functions ends up creating a new fragment, appends + * that to 'fragments'. */ public PlanFragment createInsertFragment( PlanFragment inputFragment, InsertStmt insertStmt, Analyzer analyzer, List<PlanFragment> fragments) throws ImpalaException { + boolean isComputeCost = ProcessingCost.isComputeCost(analyzer.getQueryOptions()); boolean enforce_hdfs_writer_limit = insertStmt.getTargetTable() instanceof FeFsTable - && analyzer.getQueryOptions().getMax_fs_writers() > 0; + && (analyzer.getQueryOptions().getMax_fs_writers() > 0 || isComputeCost); if (insertStmt.hasNoShuffleHint() && !enforce_hdfs_writer_limit) return inputFragment; @@ -218,11 +217,71 @@ public class DistributedPlanner { // We also consider fragments containing union nodes along with scan fragments // (leaf fragments) since they are either a part of those scan fragments or are // co-located with them to maintain parallelism. - List<ScanNode> hdfsScanORUnionNodes = Lists.newArrayList(); - inputFragment.collectPlanNodes(Predicates.instanceOf(HdfsScanNode.class), - hdfsScanORUnionNodes); - inputFragment.collectPlanNodes(Predicates.instanceOf(UnionNode.class), - hdfsScanORUnionNodes); + List<HdfsScanNode> hdfsScanNodes = Lists.newArrayList(); + inputFragment.collectPlanNodes( + Predicates.instanceOf(HdfsScanNode.class), hdfsScanNodes); + List<UnionNode> unionNodes = Lists.newArrayList(); + inputFragment.collectPlanNodes(Predicates.instanceOf(UnionNode.class), unionNodes); + boolean hasHdfsScanORUnion = !hdfsScanNodes.isEmpty() || !unionNodes.isEmpty(); + + int expectedNumInputInstance = inputFragment.getNumInstances(); + if (enforce_hdfs_writer_limit && isComputeCost) { + // Default to minParallelism * numNodes if cardinality or average row size is + // unknown. + int costBasedMaxWriter = IntMath.saturatedMultiply( + inputFragment.getNumNodes(), analyzer.getMinParallelismPerNode()); + + PlanNode root = inputFragment.getPlanRoot(); + if (root.getCardinality() > -1 && root.getAvgRowSize() > -1) { + // Both cardinality and avg row size is known. + // Estimate such that each writer will work on at least MIN_WRITE_BYTES of rows. + // However, if this is a partitioned insert, the output volume will be divided + // into several partitions. In that case, consider totalNumPartitions so that: + // total num writers is close to totalNumPartitions. + int totalNumPartitions = (int) Math.min( + Integer.MAX_VALUE, Math.max(1, getNumDistinctValues(partitionExprs))); + int minNumWriter = Math.min(totalNumPartitions, inputFragment.getNumNodes()); + int maxNumWriter = Math.min(totalNumPartitions, + IntMath.saturatedMultiply( + inputFragment.getNumNodes(), analyzer.getMaxParallelismPerNode())); + costBasedMaxWriter = (int) Math.round( + Math.ceil((root.getAvgRowSize() / HdfsTableSink.MIN_WRITE_BYTES) + * root.getCardinality())); + costBasedMaxWriter = + Math.min(maxNumWriter, Math.max(minNumWriter, costBasedMaxWriter)); + } + + if (maxHdfsWriters > 0) { + // Pick min between MAX_FS_WRITER option and costBasedMaxWriter. + maxHdfsWriters = Math.min(maxHdfsWriters, costBasedMaxWriter); + } else { + // User does not set MAX_FS_WRITER option. + maxHdfsWriters = costBasedMaxWriter; + } + Preconditions.checkState(maxHdfsWriters > 0); + insertStmt.setMaxTableSinks(maxHdfsWriters); + // At this point, parallelism of writer fragment is fixed and will not be adjusted + // by costing phase. + + if (!hdfsScanNodes.isEmpty() && fragments.size() == 1) { + // If input fragment have HdfsScanNode and input fragment is the only fragment in + // the plan, check for opportunity to collocate scan nodes and table sink. + // Since the actual costing phase only happens later after distributed plan + // created, this code redundantly compute the scan cost ahead of costing phase + // to help estimate the scan parallelism. + int maxScanThread = 1; + for (HdfsScanNode scanNode : hdfsScanNodes) { + long totalScanRange = scanNode.getEffectiveNumScanRanges(); + ProcessingCost scanCost = scanNode.computeScanProcessingCost( + analyzer.getQueryOptions(), totalScanRange); + maxScanThread = Math.max( + maxScanThread, scanCost.getNumInstanceMax(inputFragment.getNumNodes())); + } + // Override expectedNumInputInstance so that collocation may happen + // (case 3 in branch below). + expectedNumInputInstance = maxScanThread; + } + } // Make a cost-based decision only if no user hint was supplied. if (!insertStmt.hasShuffleHint()) { @@ -232,8 +291,8 @@ public class DistributedPlanner { // TODO: make a more sophisticated decision here for partitioned tables and when // we have info about tablet locations. if (partitionExprs.isEmpty()) return inputFragment; - } else if (!enforce_hdfs_writer_limit || hdfsScanORUnionNodes.size() == 0 - || inputFragment.getNumInstances() <= maxHdfsWriters) { + } else if (!enforce_hdfs_writer_limit || !hasHdfsScanORUnion + || (expectedNumInputInstance <= maxHdfsWriters)) { // Only consider skipping the addition of an exchange node if // 1. The hdfs writer limit does not apply // 2. Writer limit applies and there are no hdfs scan or union nodes. In this @@ -242,10 +301,13 @@ public class DistributedPlanner { // of instances are already under the writer limit. // Basically covering all cases where we don't mind restricting the parallelism // of their instances. - int input_instances = inputFragment.getNumInstances(); - if (enforce_hdfs_writer_limit && hdfsScanORUnionNodes.size() == 0) { + Preconditions.checkState( + expectedNumInputInstance <= inputFragment.getNumInstances()); + int input_instances = expectedNumInputInstance; + if (enforce_hdfs_writer_limit && !hasHdfsScanORUnion) { // For an internal fragment we enforce an upper limit based on the - // MAX_FS_WRITER query option. + // resulting maxHdfsWriters. + Preconditions.checkState(maxHdfsWriters > 0); input_instances = Math.min(input_instances, maxHdfsWriters); } // If the existing partition exprs are a subset of the table partition exprs, @@ -265,7 +327,7 @@ public class DistributedPlanner { // size in the particular file format of the output table/partition. // We should always know on how many nodes our input is running. long numPartitions = getNumDistinctValues(partitionExprs); - Preconditions.checkState(inputFragment.getNumInstances() != -1); + Preconditions.checkState(expectedNumInputInstance != -1); if (numPartitions > 0 && numPartitions <= input_instances) { return inputFragment; } diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java index d93c96bab..594455596 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java @@ -66,6 +66,11 @@ public class HdfsTableSink extends TableSink { public static final long PARQUET_BLOOM_FILTER_MAX_BYTES = 128 * 1024 * 1024; public static final long PARQUET_BLOOM_FILTER_MIN_BYTES = 64; + // Minimum total writes in bytes that individual HdfsTableSink should aim. + // Used to estimate parallelism of writer fragment in DistributedPlanner.java. + // This is set to match with HDFS_BLOCK_SIZE in hdfs-parquet-table-writer.h. + public static final int MIN_WRITE_BYTES = 256 * 1024 * 1024; + // Default number of partitions used for computeResourceProfile() in the absence of // column stats. protected final long DEFAULT_NUM_PARTITIONS = 10; diff --git a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java index e7614e7a0..33206e500 100644 --- a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java +++ b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java @@ -1041,7 +1041,9 @@ public class PlanFragment extends TreeNode<PlanFragment> { */ private boolean adjustToMaxParallelism(int minThreadPerNode, int maxThreadPerNode, int parentParallelism, int nodeStepCount) { + int maxThreadAllowed = IntMath.saturatedMultiply(maxThreadPerNode, getNumNodes()); boolean canTryLower = true; + // Compute maximum allowed parallelism. int maxParallelism = getNumInstances(); if (isFixedParallelism_) { @@ -1084,14 +1086,14 @@ public class PlanFragment extends TreeNode<PlanFragment> { // This is an interior fragment or fragment with single scan node. // We calculate maxParallelism, minParallelism, and costBasedMaxParallelism across // all executors in the selected executor group. - maxParallelism = IntMath.saturatedMultiply(maxThreadPerNode, getNumNodes()); + maxParallelism = maxThreadAllowed; // Bound maxParallelism by ScanNode's effective scan range count if this fragment // has ScanNode. List<ScanNode> scanNodes = Lists.newArrayList(); collectPlanNodes(Predicates.instanceOf(ScanNode.class), scanNodes); if (!scanNodes.isEmpty()) { - Preconditions.checkState(scanNodes.size() <= 1); + Preconditions.checkState(scanNodes.size() == 1); ScanNode scanNode = scanNodes.get(0); int maxScannerThreads = scanNode.getMaxScannerThreads(nodeStepCount); if (nodeStepCount == getNumNodes()) { @@ -1139,6 +1141,12 @@ public class PlanFragment extends TreeNode<PlanFragment> { } } + // Validate that maxParallelism does not exceed maxThreadAllowed. + // maxParallelism can be lower than minThreadPerNode, ie., in the case of plan root + // sink (only 1 per query) or scan with very few scan ranges, so this does not + // validate against minThreadPerNode. + Preconditions.checkState(maxParallelism <= maxThreadAllowed); + // Initialize this fragment's parallelism to the maxParallelism. setAdjustedInstanceCount(maxParallelism); return canTryLower; diff --git a/fe/src/main/java/org/apache/impala/planner/ScanNode.java b/fe/src/main/java/org/apache/impala/planner/ScanNode.java index 9f82a02d3..353beaa4e 100644 --- a/fe/src/main/java/org/apache/impala/planner/ScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/ScanNode.java @@ -342,6 +342,13 @@ abstract public class ScanNode extends PlanNode { return maxScannerThreads; } + /** + * Given effectiveScanRangeCount, compute processing cost of this scan node. + * <p> + * This method does not mutate any state of the scan node object, including + * the processingCost_ field. Caller must set processingCost_ themself with + * the return value of this method. + */ protected ProcessingCost computeScanProcessingCost( TQueryOptions queryOptions, long effectiveScanRangeCount) { ProcessingCost cardinalityBasedCost = ProcessingCost.basicCost(getDisplayLabel(), @@ -413,6 +420,9 @@ abstract public class ScanNode extends PlanNode { public ExprSubstitutionMap getOptimizedAggSmap() { return optimizedAggSmap_; } + /** + * Return maximum number of scanner thread, rounded up to next multiple of numNodes. + */ protected int getMaxScannerThreads(int numNodes) { return processingCost_.getNumInstanceMax(numNodes); } diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index ab38311fa..248018130 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -1967,8 +1967,11 @@ public class Frontend { // Only the following types of statements are considered auto scalable since each // can be planned by the distributed planner utilizing the number of executors in // an executor group as input. - public static boolean canStmtBeAutoScaled(TStmtType type) { - return type == TStmtType.EXPLAIN || type == TStmtType.QUERY || type == TStmtType.DML; + public static boolean canStmtBeAutoScaled(TExecRequest req) { + return req.stmt_type == TStmtType.EXPLAIN || req.stmt_type == TStmtType.QUERY + || req.stmt_type == TStmtType.DML + || (req.stmt_type == TStmtType.DDL && req.query_exec_request != null + && req.query_exec_request.stmt_type == TStmtType.DML /* CTAS */); } private static int expectedNumExecutor(TExecutorGroupSet execGroupSet) { @@ -2097,7 +2100,7 @@ public class Frontend { } else if (!enable_replan) { reason = "query option ENABLE_REPLAN=false"; notScalable = true; - } else if (!Frontend.canStmtBeAutoScaled(req.stmt_type)) { + } else if (!Frontend.canStmtBeAutoScaled(req)) { reason = "query is not auto-scalable"; notScalable = true; } diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py index 978c8b624..d919ee8e0 100644 --- a/tests/custom_cluster/test_executor_groups.py +++ b/tests/custom_cluster/test_executor_groups.py @@ -20,6 +20,7 @@ from __future__ import absolute_import, division, print_function from builtins import range from tests.common.custom_cluster_test_suite import CustomClusterTestSuite +from tests.common.parametrize import UniqueDatabase from tests.util.concurrent_workload import ConcurrentWorkload import json @@ -35,7 +36,7 @@ LOG = logging.getLogger("test_auto_scaling") TEST_QUERY = "select count(*) from functional.alltypes where month + random() < 3" # A query to test CPU requirement. Estimated memory per host is 37MB. -CPU_TEST_QUERY = "select * from tpcds_parquet.store_sales where ss_item_sk = 1 limit 50;" +CPU_TEST_QUERY = "select * from tpcds_parquet.store_sales where ss_item_sk = 1 limit 50" # A query with full table scan characteristics. GROUPING_TEST_QUERY = ("select ss_item_sk from tpcds_parquet.store_sales" @@ -874,7 +875,7 @@ class TestExecutorGroups(CustomClusterTestSuite): def _set_query_options(self, query_options): """Set query options""" for k, v in query_options.items(): - self.execute_query_expect_success(self.client, "SET {}='{}';".format(k, v)) + self.execute_query_expect_success(self.client, "SET {}='{}'".format(k, v)) def _run_query_and_verify_profile(self, query, expected_strings_in_profile, not_expected_in_profile=[]): @@ -886,9 +887,26 @@ class TestExecutorGroups(CustomClusterTestSuite): assert expected_profile in str(result.runtime_profile) for not_expected in not_expected_in_profile: assert not_expected not in str(result.runtime_profile) - + return result + + def __verify_fs_writers(self, result, expected_num_writers, + expected_instances_per_host): + assert 'HDFS WRITER' in result.exec_summary[0]['operator'], result.runtime_profile + num_writers = int(result.exec_summary[0]['num_instances']) + assert (num_writers == expected_num_writers), result.runtime_profile + num_hosts = len(expected_instances_per_host) + regex = (r'Per Host Number of Fragment Instances:' + + (num_hosts * r'.*?\((.*?)\)') + r'.*?\n') + matches = re.findall(regex, result.runtime_profile) + assert len(matches) == 1 and len(matches[0]) == num_hosts, result.runtime_profile + num_instances_per_host = [int(i) for i in matches[0]] + num_instances_per_host.sort() + expected_instances_per_host.sort() + assert num_instances_per_host == expected_instances_per_host, result.runtime_profile + + @UniqueDatabase.parametrize(sync_ddl=True) @pytest.mark.execute_serially - def test_query_cpu_count_divisor_default(self): + def test_query_cpu_count_divisor_default(self, unique_database): # Expect to run the query on the small group by default. coordinator_test_args = "" self._setup_three_exec_group_cluster(coordinator_test_args) @@ -1039,14 +1057,105 @@ class TestExecutorGroups(CustomClusterTestSuite): 'PROCESSING_COST_MIN_THREADS': '', 'MAX_FRAGMENT_INSTANCES_PER_NODE': ''}) + # BEGIN testing insert + MAX_FS_WRITER + # Test unpartitioned insert, small scan, no MAX_FS_WRITER. + # Scanner and writer will collocate since num scanner equals to num writer (1). + result = self._run_query_and_verify_profile( + ("create table {0}.{1} as " + "select id, year from functional_parquet.alltypes" + ).format(unique_database, "test_ctas1"), + ["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1", + "Verdict: Match", "CpuAsk: 1"]) + self.__verify_fs_writers(result, 1, [0, 1]) + + # Test unpartitioned insert, small scan, no MAX_FS_WRITER, with limit. + # The limit will cause an exchange node insertion between scanner and writer. + result = self._run_query_and_verify_profile( + ("create table {0}.{1} as " + "select id, year from functional_parquet.alltypes limit 100000" + ).format(unique_database, "test_ctas2"), + ["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1", + "Verdict: Match", "CpuAsk: 2"]) + self.__verify_fs_writers(result, 1, [0, 2]) + + # Test partitioned insert, small scan, no MAX_FS_WRITER. + # Scanner and writer will collocate since num scanner equals to num writer (1). + result = self._run_query_and_verify_profile( + ("create table {0}.{1} partitioned by (year) as " + "select id, year from functional_parquet.alltypes" + ).format(unique_database, "test_ctas3"), + ["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1", + "Verdict: Match", "CpuAsk: 1"]) + self.__verify_fs_writers(result, 1, [0, 1]) + + # Test unpartitioned insert, large scan, no MAX_FS_WRITER. + result = self._run_query_and_verify_profile( + ("create table {0}.{1} as " + "select ss_item_sk, ss_ticket_number, ss_store_sk " + "from tpcds_parquet.store_sales").format(unique_database, "test_ctas4"), + ["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3", + "Verdict: Match", "CpuAsk: 13"]) + self.__verify_fs_writers(result, 1, [0, 4, 4, 5]) + + # Test partitioned insert, large scan, no MAX_FS_WRITER. + result = self._run_query_and_verify_profile( + ("create table {0}.{1} partitioned by (ss_store_sk) as " + "select ss_item_sk, ss_ticket_number, ss_store_sk " + "from tpcds_parquet.store_sales").format(unique_database, "test_ctas5"), + ["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3", + "Verdict: Match", "CpuAsk: 15"]) + self.__verify_fs_writers(result, 3, [0, 5, 5, 5]) + + # Test partitioned insert, large scan, high MAX_FS_WRITER. + self._set_query_options({'MAX_FS_WRITERS': '30'}) + result = self._run_query_and_verify_profile( + ("create table {0}.{1} partitioned by (ss_store_sk) as " + "select ss_item_sk, ss_ticket_number, ss_store_sk " + "from tpcds_parquet.store_sales").format(unique_database, "test_ctas6"), + ["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3", + "Verdict: Match", "CpuAsk: 15"]) + self.__verify_fs_writers(result, 3, [0, 5, 5, 5]) + + # Test partitioned insert, large scan, low MAX_FS_WRITER. + self._set_query_options({'MAX_FS_WRITERS': '2'}) + result = self._run_query_and_verify_profile( + ("create table {0}.{1} partitioned by (ss_store_sk) as " + "select ss_item_sk, ss_ticket_number, ss_store_sk " + "from tpcds_parquet.store_sales").format(unique_database, "test_ctas7"), + ["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3", + "Verdict: Match", "CpuAsk: 14"]) + self.__verify_fs_writers(result, 2, [0, 4, 5, 5]) + + # Test that non-CTAS unpartitioned insert works. MAX_FS_WRITER=2. + result = self._run_query_and_verify_profile( + ("insert overwrite {0}.{1} " + "select ss_item_sk, ss_ticket_number, ss_store_sk " + "from tpcds_parquet.store_sales").format(unique_database, "test_ctas4"), + ["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3", + "Verdict: Match", "CpuAsk: 13"]) + self.__verify_fs_writers(result, 1, [0, 4, 4, 5]) + + # Test that non-CTAS partitioned insert works. MAX_FS_WRITER=2. + result = self._run_query_and_verify_profile( + ("insert overwrite {0}.{1} (ss_item_sk, ss_ticket_number) " + "partition (ss_store_sk) " + "select ss_item_sk, ss_ticket_number, ss_store_sk " + "from tpcds_parquet.store_sales").format(unique_database, "test_ctas7"), + ["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3", + "Verdict: Match", "CpuAsk: 14"]) + self.__verify_fs_writers(result, 2, [0, 4, 5, 5]) + + # Unset MAX_FS_WRITERS. + self._set_query_options({'MAX_FS_WRITERS': ''}) + # END testing insert + MAX_FS_WRITER + # Check resource pools on the Web queries site and admission site self._verify_query_num_for_resource_pool("root.small", 4) self._verify_query_num_for_resource_pool("root.tiny", 4) - self._verify_query_num_for_resource_pool("root.large", 10) + self._verify_query_num_for_resource_pool("root.large", 12) self._verify_total_admitted_queries("root.small", 4) - self._verify_total_admitted_queries("root.tiny", 3) - self._verify_total_admitted_queries("root.large", 10) - self.client.close() + self._verify_total_admitted_queries("root.tiny", 6) + self._verify_total_admitted_queries("root.large", 16) @pytest.mark.execute_serially def test_query_cpu_count_divisor_two(self): @@ -1059,10 +1168,10 @@ class TestExecutorGroups(CustomClusterTestSuite): ["Executor Group: root.small-group", "CpuAsk: 6", "EffectiveParallelism: 11", "ExecutorGroupsConsidered: 2"]) + # Check resource pools on the Web queries site and admission site self._verify_query_num_for_resource_pool("root.small", 1) self._verify_total_admitted_queries("root.small", 1) - self.client.close() @pytest.mark.execute_serially def test_query_cpu_count_divisor_fraction(self): @@ -1087,10 +1196,10 @@ class TestExecutorGroups(CustomClusterTestSuite): ["Executor Group: root.large-group", "EffectiveParallelism: 16", "ExecutorGroupsConsidered: 3", "CpuAsk: 534", "Verdict: no executor group set fit. Admit to last executor group set."]) + # Check resource pools on the Web queries site and admission site self._verify_query_num_for_resource_pool("root.large", 2) self._verify_total_admitted_queries("root.large", 2) - self.client.close() @pytest.mark.execute_serially def test_no_skip_resource_checking(self): @@ -1103,7 +1212,6 @@ class TestExecutorGroups(CustomClusterTestSuite): result = self.execute_query_expect_failure(self.client, CPU_TEST_QUERY) assert ("AnalysisException: The query does not fit largest executor group sets. " "Reason: not enough cpu cores (require=434, max=192).") in str(result) - self.client.close() @pytest.mark.execute_serially def test_min_processing_per_thread_small(self): @@ -1131,7 +1239,11 @@ class TestExecutorGroups(CustomClusterTestSuite): ["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1", "Verdict: Match", "CpuAsk: 1"]) - self.client.close() + # Check resource pools on the Web queries site and admission site + self._verify_query_num_for_resource_pool("root.tiny", 1) + self._verify_query_num_for_resource_pool("root.large", 2) + self._verify_total_admitted_queries("root.tiny", 1) + self._verify_total_admitted_queries("root.large", 2) @pytest.mark.execute_serially def test_per_exec_group_set_metrics(self): diff --git a/tests/query_test/test_insert.py b/tests/query_test/test_insert.py index 587ceff08..b48e6ed09 100644 --- a/tests/query_test/test_insert.py +++ b/tests/query_test/test_insert.py @@ -416,28 +416,55 @@ class TestInsertHdfsWriterLimit(ImpalaTestSuite): @UniqueDatabase.parametrize(sync_ddl=True) @SkipIfNotHdfsMinicluster.tuned_for_minicluster def test_processing_cost_writer_limit(self, unique_database): + """Test both scenario of partitioned and unpartitioned insert. + All of the unpartitioned testscases will result in one instance writer because the + output volume is less than 256MB. Partitoned insert will result in 1 writer per node + unless max_fs_writers is set lower than num nodes.""" # Root internal (non-leaf) fragment. query = "create table {0}.test1 as select int_col from " \ "functional_parquet.alltypes".format(unique_database) - self.__run_insert_and_verify_instances(query, max_fs_writers=11, mt_dop=0, - expected_num_instances_per_host=[4, 5, 5], - processing_cost_min_threads=10) + self.__run_insert_and_verify_instances(query, max_fs_writers=2, + expected_num_instances_per_host=[1, 1, 2], + processing_cost_min_threads=1) # Root coordinator fragment. query = "create table {0}.test2 as select int_col from " \ "functional_parquet.alltypes limit 100000".format(unique_database) - self.__run_insert_and_verify_instances(query, max_fs_writers=2, mt_dop=0, + self.__run_insert_and_verify_instances(query, max_fs_writers=2, expected_num_instances_per_host=[1, 1, 2], - processing_cost_min_threads=10) - # Root scan fragment. Instance count within limit. + processing_cost_min_threads=1) + # Root internal (non-leaf) fragment. Instance count within limit. query = "create table {0}.test3 as select int_col from " \ "functional_parquet.alltypes".format(unique_database) - self.__run_insert_and_verify_instances(query, max_fs_writers=30, mt_dop=0, - expected_num_instances_per_host=[8, 8, 8], - processing_cost_min_threads=10) + self.__run_insert_and_verify_instances(query, max_fs_writers=30, + expected_num_instances_per_host=[1, 1, 2], + processing_cost_min_threads=1) + # Root internal (non-leaf) fragment. No max_fs_writers. + # Scan node and writer sink should always be in separate fragment with cost-based + # scaling. + query = "create table {0}.test4 as select int_col from " \ + "functional_parquet.alltypes".format(unique_database) + self.__run_insert_and_verify_instances(query, max_fs_writers=0, + expected_num_instances_per_host=[1, 1, 2], + processing_cost_min_threads=1) + # Partitioned insert with 6 distinct partition values. + # Should create at least 1 writer per node. + query = "create table {0}.test5 partitioned by (ss_store_sk) as " \ + "select ss_item_sk, ss_ticket_number, ss_store_sk " \ + "from tpcds_parquet.store_sales".format(unique_database) + self.__run_insert_and_verify_instances(query, max_fs_writers=0, + expected_num_instances_per_host=[5, 5, 5], + processing_cost_min_threads=1) + # Partitioned insert can still be limited by max_fs_writers option. + query = "create table {0}.test6 partitioned by (ss_store_sk) as " \ + "select ss_item_sk, ss_ticket_number, ss_store_sk " \ + "from tpcds_parquet.store_sales".format(unique_database) + self.__run_insert_and_verify_instances(query, max_fs_writers=2, + expected_num_instances_per_host=[4, 5, 5], + processing_cost_min_threads=1) - def __run_insert_and_verify_instances(self, query, max_fs_writers, mt_dop, - expected_num_instances_per_host, - processing_cost_min_threads=0): + def __run_insert_and_verify_instances(self, query, max_fs_writers=0, mt_dop=0, + processing_cost_min_threads=0, + expected_num_instances_per_host=[]): self.client.set_configuration_option("max_fs_writers", max_fs_writers) self.client.set_configuration_option("mt_dop", mt_dop) if processing_cost_min_threads > 0: @@ -451,8 +478,9 @@ class TestInsertHdfsWriterLimit(ImpalaTestSuite): 3) result = self.client.execute(query) assert 'HDFS WRITER' in result.exec_summary[0]['operator'], result.runtime_profile - assert int(result.exec_summary[0]['num_instances']) <= int( - max_fs_writers), result.runtime_profile + if (max_fs_writers > 0): + num_writers = int(result.exec_summary[0]['num_instances']) + assert (num_writers <= max_fs_writers), result.runtime_profile regex = r'Per Host Number of Fragment Instances' \ r':.*?\((.*?)\).*?\((.*?)\).*?\((.*?)\).*?\n' matches = re.findall(regex, result.runtime_profile)
