IMPALA-7351: Add estimates to Exchange node Added rough estimates for exchange node and a justification of the method in the in-line comments.
Testing: Updated Planner tests. Change-Id: I5b577f9511abc48b992e814d50bba4959f23f7fd Reviewed-on: http://gerrit.cloudera.org:8080/11692 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/8cbec20e Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/8cbec20e Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/8cbec20e Branch: refs/heads/master Commit: 8cbec20ea4f1399fce14018d30050d4b7ee501bc Parents: 1d72c75 Author: Bikramjeet Vig <[email protected]> Authored: Thu Oct 11 12:16:11 2018 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Thu Oct 25 21:17:12 2018 +0000 ---------------------------------------------------------------------- be/src/util/backend-gflag-util.cc | 3 + common/thrift/BackendGflags.thrift | 2 + .../org/apache/impala/planner/ExchangeNode.java | 95 +- .../org/apache/impala/planner/PlanNode.java | 9 + .../queries/PlannerTest/kudu-selectivity.test | 16 +- .../queries/PlannerTest/max-row-size.test | 104 +- .../PlannerTest/resource-requirements.test | 972 +++++++++---------- .../PlannerTest/spillable-buffer-sizing.test | 214 ++-- .../queries/PlannerTest/tpcds-all.test | 60 +- .../queries/PlannerTest/tpch-all.test | 64 +- .../queries/PlannerTest/tpch-nested.test | 172 ++-- .../queries/QueryTest/explain-level2.test | 18 +- .../queries/QueryTest/explain-level3.test | 22 +- 13 files changed, 920 insertions(+), 831 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/8cbec20e/be/src/util/backend-gflag-util.cc ---------------------------------------------------------------------- diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc index 3760c84..9f4f2d0 100644 --- a/be/src/util/backend-gflag-util.cc +++ b/be/src/util/backend-gflag-util.cc @@ -67,6 +67,7 @@ DECLARE_int64(kudu_scanner_thread_estimated_bytes_per_column); DECLARE_int64(kudu_scanner_thread_max_estimated_bytes); DECLARE_int32(catalog_max_parallel_partial_fetch_rpc); DECLARE_int64(catalog_partial_fetch_rpc_queue_timeout_s); +DECLARE_int64(exchg_node_buffer_size_bytes); namespace impala { @@ -132,6 +133,8 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* cfg_bytes) { FLAGS_catalog_max_parallel_partial_fetch_rpc); cfg.__set_catalog_partial_fetch_rpc_queue_timeout_s( FLAGS_catalog_partial_fetch_rpc_queue_timeout_s); + cfg.__set_exchg_node_buffer_size_bytes( + FLAGS_exchg_node_buffer_size_bytes); RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &cfg, cfg_bytes)); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/impala/blob/8cbec20e/common/thrift/BackendGflags.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift index 531fc2c..dff4b8c 100644 --- a/common/thrift/BackendGflags.thrift +++ b/common/thrift/BackendGflags.thrift @@ -107,4 +107,6 @@ struct TBackendGflags { 40: required i32 catalog_max_parallel_partial_fetch_rpc 41: required i64 catalog_partial_fetch_rpc_queue_timeout_s + + 42: required i64 exchg_node_buffer_size_bytes } http://git-wip-us.apache.org/repos/asf/impala/blob/8cbec20e/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java index 05184f2..356ae6b 100644 --- a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java +++ b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java @@ -22,12 +22,14 @@ import org.apache.impala.analysis.Expr; import org.apache.impala.analysis.SortInfo; import org.apache.impala.analysis.TupleId; import org.apache.impala.common.ImpalaException; +import org.apache.impala.service.BackendConfig; import org.apache.impala.thrift.TExchangeNode; import org.apache.impala.thrift.TExplainLevel; import org.apache.impala.thrift.TPlanNode; import org.apache.impala.thrift.TPlanNodeType; import org.apache.impala.thrift.TQueryOptions; import org.apache.impala.thrift.TSortInfo; + import com.google.common.base.Preconditions; /** @@ -49,6 +51,9 @@ public class ExchangeNode extends PlanNode { // update this constant as well. private static final double PER_TUPLE_SERIALIZATION_OVERHEAD = 4.0; + // Empirically derived minimum estimate (in bytes) for the exchange node. + private static final int MIN_ESTIMATE_BYTES = 16 * 1024; + // The parameters based on which sorted input streams are merged by this // exchange node. Null if this exchange does not merge sorted streams private SortInfo mergeInfo_; @@ -57,6 +62,21 @@ public class ExchangeNode extends PlanNode { // only if mergeInfo_ is non-null, i.e. this is a merging exchange node. private long offset_; + private boolean isMergingExchange() { + return mergeInfo_ != null; + } + + private boolean isBroadcastExchange() { + // If the output of the sink is not partitioned but the target fragment is + // partitioned, then the data exchange is broadcast. + Preconditions.checkState(!children_.isEmpty()); + DataSink sink = getChild(0).getFragment().getSink(); + if (sink == null) return false; + Preconditions.checkState(sink instanceof DataStreamSink); + DataStreamSink streamSink = (DataStreamSink) sink; + return !streamSink.getOutputPartition().isPartitioned() && fragment_.isPartitioned(); + } + public ExchangeNode(PlanNodeId id, PlanNode input) { super(id, "EXCHANGE"); offset_ = 0; @@ -136,7 +156,7 @@ public class ExchangeNode extends PlanNode { output.append(detailPrefix + "offset: ").append(offset_).append("\n"); } - if (mergeInfo_ != null && detailLevel.ordinal() > TExplainLevel.MINIMAL.ordinal()) { + if (isMergingExchange() && detailLevel.ordinal() > TExplainLevel.MINIMAL.ordinal()) { output.append(detailPrefix + "order by: "); for (int i = 0; i < mergeInfo_.getSortExprs().size(); ++i) { if (i > 0) output.append(", "); @@ -160,14 +180,11 @@ public class ExchangeNode extends PlanNode { Preconditions.checkState(!children_.isEmpty()); DataSink sink = getChild(0).getFragment().getSink(); if (sink == null) return ""; - Preconditions.checkState(sink instanceof DataStreamSink); - DataStreamSink streamSink = (DataStreamSink) sink; - if (!streamSink.getOutputPartition().isPartitioned() && - fragment_.isPartitioned()) { - // If the output of the sink is not partitioned but the target fragment is - // partitioned, then the data exchange is broadcast. + if (isBroadcastExchange()) { return "BROADCAST"; } else { + Preconditions.checkState(sink instanceof DataStreamSink); + DataStreamSink streamSink = (DataStreamSink) sink; return streamSink.getOutputPartition().getExplainString(); } } @@ -183,8 +200,66 @@ public class ExchangeNode extends PlanNode { @Override public void computeNodeResourceProfile(TQueryOptions queryOptions) { - // TODO: add an estimate - nodeResourceProfile_ = ResourceProfile.noReservation(0); + // For non-merging exchanges, one row batch queue is maintained for row + // batches from all sender fragment instances. For merging exchange, one + // queue is created for the batches from each distinct sender. There is a + // soft limit on every row batch queue of FLAGS_exchg_node_buffer_size_bytes + // (default 10MB). There is also a deferred rpc queue which queues at max + // one rpc payload (containing the rowbatch) per sender in-case the row + // batch queue hits the previously mentioned soft limit. Actual memory used + // depends on the row size (that can vary a lot due to presence of var len + // strings) and on the rate at which rows are received and consumed from the + // exchange node which in turn depends on the complexity of the query and + // the system load. This makes it difficult to accurately estimate the + // memory usage at runtime. The following estimates assume that memory usage will + // lean towards the soft limits. + Preconditions.checkState(!children_.isEmpty()); + Preconditions.checkNotNull(children_.get(0).getFragment()); + int numSenders = children_.get(0).getFragment().getNumInstances(queryOptions.mt_dop); + long estimatedTotalQueueByteSize = estimateTotalQueueByteSize(numSenders); + long estimatedDeferredRPCQueueSize = estimateDeferredRPCQueueSize(queryOptions, + numSenders); + long estimatedMem = Math.max( + checkedAdd(estimatedTotalQueueByteSize, estimatedDeferredRPCQueueSize), + MIN_ESTIMATE_BYTES); + nodeResourceProfile_ = ResourceProfile.noReservation(estimatedMem); + } + + // Returns the estimated size of the deferred batch queue (in bytes) by + // assuming that at least one row batch rpc payload per sender is queued. + private long estimateDeferredRPCQueueSize(TQueryOptions queryOptions, + int numSenders) { + long rowBatchSize = queryOptions.isSetBatch_size() && queryOptions.batch_size > 0 + ? queryOptions.batch_size : DEFAULT_ROWBATCH_SIZE; + // Set an upper limit based on estimated cardinality. + if (getCardinality() > 0) rowBatchSize = Math.min(rowBatchSize, getCardinality()); + long avgRowBatchByteSize = Math.min( + (long) Math.ceil(rowBatchSize * getAvgSerializedRowSize(this)), + ROWBATCH_MAX_MEM_USAGE); + long deferredBatchQueueSize = avgRowBatchByteSize * numSenders; + return deferredBatchQueueSize; + } + + // Returns the total estimated size (in bytes) of the row batch queues by + // assuming enough batches can be queued such that it hits the row batch + // queue's soft mem limit. + private long estimateTotalQueueByteSize(int numSenders) { + int numQueues = isMergingExchange() ? numSenders : 1; + long maxQueueByteSize = BackendConfig.INSTANCE.getBackendCfg(). + exchg_node_buffer_size_bytes; + // TODO: Should we set a better default size here? This might be alot for + // queries without stats. + long estimatedTotalQueueByteSize = numQueues * maxQueueByteSize; + // Set an upper limit based on estimated cardinality. + if (hasValidStats()) { + long totalBytesToReceive = (long) Math.ceil(getAvgRowSize() * getCardinality()); + // Assuming no skew in distribution during data shuffling. + long bytesToReceivePerExchNode = isBroadcastExchange() ? totalBytesToReceive + : totalBytesToReceive / getNumNodes(); + estimatedTotalQueueByteSize = Math.min(bytesToReceivePerExchNode, + estimatedTotalQueueByteSize); + } + return estimatedTotalQueueByteSize; } @Override @@ -202,7 +277,7 @@ public class ExchangeNode extends PlanNode { msg.exchange_node.addToInput_row_tuples(tid.asInt()); } - if (mergeInfo_ != null) { + if (isMergingExchange()) { TSortInfo sortInfo = new TSortInfo( Expr.treesToThrift(mergeInfo_.getSortExprs()), mergeInfo_.getIsAscOrder(), mergeInfo_.getNullsFirst()); http://git-wip-us.apache.org/repos/asf/impala/blob/8cbec20e/fe/src/main/java/org/apache/impala/planner/PlanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java b/fe/src/main/java/org/apache/impala/planner/PlanNode.java index 927cc18..744aa09 100644 --- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java @@ -68,6 +68,15 @@ import com.google.common.math.LongMath; abstract public class PlanNode extends TreeNode<PlanNode> { private final static Logger LOG = LoggerFactory.getLogger(PlanNode.class); + // The default row batch size used if the BATCH_SIZE query option is not set + // or is less than 1. Must be in sync with QueryState::DEFAULT_BATCH_SIZE. + protected final static int DEFAULT_ROWBATCH_SIZE = 1024; + + // Max memory that a row batch can accumulate before it is considered at capacity. + // This is a soft capacity: row batches may exceed the capacity, preferably only by a + // row's worth of data. Must be in sync with RowBatch::AT_CAPACITY_MEM_USAGE. + protected final static int ROWBATCH_MAX_MEM_USAGE = 8 * 1024 * 1024; + // String used for this node in getExplainString(). protected String displayName_; http://git-wip-us.apache.org/repos/asf/impala/blob/8cbec20e/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test index 8a59968..6ae50f3 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test @@ -12,12 +12,12 @@ Per-Host Resources: mem-estimate=1.88MB mem-reservation=0B thread-reservation=2 in pipelines: 00(GETNEXT) ---- DISTRIBUTEDPLAN F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 -Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1 +Per-Host Resources: mem-estimate=16.00KB mem-reservation=0B thread-reservation=1 PLAN-ROOT SINK | mem-estimate=0B mem-reservation=0B thread-reservation=0 | 01:EXCHANGE [UNPARTITIONED] - mem-estimate=0B mem-reservation=0B thread-reservation=0 + mem-estimate=16.00KB mem-reservation=0B thread-reservation=0 tuple-ids=0 row-size=124B cardinality=1 in pipelines: 00(GETNEXT) @@ -47,12 +47,12 @@ Per-Host Resources: mem-estimate=3.75MB mem-reservation=0B thread-reservation=2 in pipelines: 00(GETNEXT) ---- DISTRIBUTEDPLAN F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 -Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1 +Per-Host Resources: mem-estimate=16.00KB mem-reservation=0B thread-reservation=1 PLAN-ROOT SINK | mem-estimate=0B mem-reservation=0B thread-reservation=0 | 01:EXCHANGE [UNPARTITIONED] - mem-estimate=0B mem-reservation=0B thread-reservation=0 + mem-estimate=16.00KB mem-reservation=0B thread-reservation=0 tuple-ids=0 row-size=124B cardinality=1 in pipelines: 00(GETNEXT) @@ -81,12 +81,12 @@ Per-Host Resources: mem-estimate=3.75MB mem-reservation=0B thread-reservation=2 in pipelines: 00(GETNEXT) ---- DISTRIBUTEDPLAN F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 -Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1 +Per-Host Resources: mem-estimate=517.89KB mem-reservation=0B thread-reservation=1 PLAN-ROOT SINK | mem-estimate=0B mem-reservation=0B thread-reservation=0 | 01:EXCHANGE [UNPARTITIONED] - mem-estimate=0B mem-reservation=0B thread-reservation=0 + mem-estimate=517.89KB mem-reservation=0B thread-reservation=0 tuple-ids=0 row-size=124B cardinality=3317 in pipelines: 00(GETNEXT) @@ -114,12 +114,12 @@ Per-Host Resources: mem-estimate=3.75MB mem-reservation=0B thread-reservation=2 in pipelines: 00(GETNEXT) ---- DISTRIBUTEDPLAN F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 -Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1 +Per-Host Resources: mem-estimate=16.00KB mem-reservation=0B thread-reservation=1 PLAN-ROOT SINK | mem-estimate=0B mem-reservation=0B thread-reservation=0 | 01:EXCHANGE [UNPARTITIONED] - mem-estimate=0B mem-reservation=0B thread-reservation=0 + mem-estimate=16.00KB mem-reservation=0B thread-reservation=0 tuple-ids=0 row-size=124B cardinality=3 in pipelines: 00(GETNEXT) http://git-wip-us.apache.org/repos/asf/impala/blob/8cbec20e/testdata/workloads/functional-planner/queries/PlannerTest/max-row-size.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/max-row-size.test b/testdata/workloads/functional-planner/queries/PlannerTest/max-row-size.test index f82f1be..e1d8119 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/max-row-size.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/max-row-size.test @@ -5,20 +5,20 @@ from tpch_parquet.customer inner join tpch_parquet.nation on c_nationkey = n_nationkey ---- DISTRIBUTEDPLAN Max Per-Host Resource Reservation: Memory=33.97MB Threads=5 -Per-Host Resource Estimates: Memory=58MB +Per-Host Resource Estimates: Memory=68MB F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 -| Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1 +| Per-Host Resources: mem-estimate=10.35MB mem-reservation=0B thread-reservation=1 PLAN-ROOT SINK | mem-estimate=0B mem-reservation=0B thread-reservation=0 | 04:EXCHANGE [UNPARTITIONED] -| mem-estimate=0B mem-reservation=0B thread-reservation=0 +| mem-estimate=10.35MB mem-reservation=0B thread-reservation=0 | tuple-ids=0,1 row-size=355B cardinality=150000 | in pipelines: 00(GETNEXT) | F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1 -Per-Host Resources: mem-estimate=41.94MB mem-reservation=33.94MB thread-reservation=2 runtime-filters-memory=1.00MB +Per-Host Resources: mem-estimate=41.95MB mem-reservation=33.94MB thread-reservation=2 runtime-filters-memory=1.00MB 02:HASH JOIN [INNER JOIN, BROADCAST] | hash predicates: c_nationkey = n_nationkey | fk/pk conjuncts: c_nationkey = n_nationkey @@ -28,7 +28,7 @@ Per-Host Resources: mem-estimate=41.94MB mem-reservation=33.94MB thread-reservat | in pipelines: 00(GETNEXT), 01(OPEN) | |--03:EXCHANGE [BROADCAST] -| | mem-estimate=0B mem-reservation=0B thread-reservation=0 +| | mem-estimate=16.00KB mem-reservation=0B thread-reservation=0 | | tuple-ids=1 row-size=117B cardinality=25 | | in pipelines: 01(GETNEXT) | | @@ -62,20 +62,20 @@ from tpch_parquet.lineitem left join tpch_parquet.orders on l_orderkey = o_orderkey ---- DISTRIBUTEDPLAN Max Per-Host Resource Reservation: Memory=110.00MB Threads=5 -Per-Host Resource Estimates: Memory=420MB +Per-Host Resource Estimates: Memory=442MB F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 -| Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1 +| Per-Host Resources: mem-estimate=11.35MB mem-reservation=0B thread-reservation=1 PLAN-ROOT SINK | mem-estimate=0B mem-reservation=0B thread-reservation=0 | 04:EXCHANGE [UNPARTITIONED] -| mem-estimate=0B mem-reservation=0B thread-reservation=0 +| mem-estimate=11.35MB mem-reservation=0B thread-reservation=0 | tuple-ids=0,1N row-size=454B cardinality=6001215 | in pipelines: 00(GETNEXT) | F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 -Per-Host Resources: mem-estimate=380.41MB mem-reservation=86.00MB thread-reservation=2 +Per-Host Resources: mem-estimate=390.80MB mem-reservation=86.00MB thread-reservation=2 02:HASH JOIN [LEFT OUTER JOIN, BROADCAST] | hash predicates: l_orderkey = o_orderkey | fk/pk conjuncts: l_orderkey = o_orderkey @@ -84,7 +84,7 @@ Per-Host Resources: mem-estimate=380.41MB mem-reservation=86.00MB thread-reserva | in pipelines: 00(GETNEXT), 01(OPEN) | |--03:EXCHANGE [BROADCAST] -| | mem-estimate=0B mem-reservation=0B thread-reservation=0 +| | mem-estimate=10.38MB mem-reservation=0B thread-reservation=0 | | tuple-ids=1 row-size=191B cardinality=1500000 | | in pipelines: 01(GETNEXT) | | @@ -101,11 +101,11 @@ Per-Host Resources: mem-estimate=380.41MB mem-reservation=86.00MB thread-reserva | in pipelines: 01(GETNEXT) | 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM] - partitions=1/1 files=3 size=193.71MB + partitions=1/1 files=3 size=193.72MB stored statistics: - table: rows=6001215 size=193.71MB + table: rows=6001215 size=193.72MB columns: all - extrapolated-rows=disabled max-scan-range-rows=2141802 + extrapolated-rows=disabled max-scan-range-rows=2141530 mem-estimate=80.00MB mem-reservation=40.00MB thread-reservation=1 tuple-ids=0 row-size=263B cardinality=6001215 in pipelines: 00(GETNEXT) @@ -116,20 +116,20 @@ select * from tpch_parquet.lineitem where l_orderkey not in (select o_orderkey from tpch_parquet.orders) ---- DISTRIBUTEDPLAN Max Per-Host Resource Reservation: Memory=78.00MB Threads=5 -Per-Host Resource Estimates: Memory=154MB +Per-Host Resource Estimates: Memory=175MB F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 -| Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1 +| Per-Host Resources: mem-estimate=10.78MB mem-reservation=0B thread-reservation=1 PLAN-ROOT SINK | mem-estimate=0B mem-reservation=0B thread-reservation=0 | 04:EXCHANGE [UNPARTITIONED] -| mem-estimate=0B mem-reservation=0B thread-reservation=0 +| mem-estimate=10.78MB mem-reservation=0B thread-reservation=0 | tuple-ids=0 row-size=263B cardinality=6001215 | in pipelines: 00(GETNEXT) | F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 -Per-Host Resources: mem-estimate=114.00MB mem-reservation=74.00MB thread-reservation=2 +Per-Host Resources: mem-estimate=124.02MB mem-reservation=74.00MB thread-reservation=2 02:HASH JOIN [NULL AWARE LEFT ANTI JOIN, BROADCAST] | hash predicates: l_orderkey = o_orderkey | mem-estimate=34.00MB mem-reservation=34.00MB spill-buffer=1.00MB thread-reservation=0 @@ -137,7 +137,7 @@ Per-Host Resources: mem-estimate=114.00MB mem-reservation=74.00MB thread-reserva | in pipelines: 00(GETNEXT), 01(OPEN) | |--03:EXCHANGE [BROADCAST] -| | mem-estimate=0B mem-reservation=0B thread-reservation=0 +| | mem-estimate=10.02MB mem-reservation=0B thread-reservation=0 | | tuple-ids=1 row-size=8B cardinality=1500000 | | in pipelines: 01(GETNEXT) | | @@ -154,11 +154,11 @@ Per-Host Resources: mem-estimate=114.00MB mem-reservation=74.00MB thread-reserva | in pipelines: 01(GETNEXT) | 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM] - partitions=1/1 files=3 size=193.71MB + partitions=1/1 files=3 size=193.72MB stored statistics: - table: rows=6001215 size=193.71MB + table: rows=6001215 size=193.72MB columns: all - extrapolated-rows=disabled max-scan-range-rows=2141802 + extrapolated-rows=disabled max-scan-range-rows=2141530 mem-estimate=80.00MB mem-reservation=40.00MB thread-reservation=1 tuple-ids=0 row-size=263B cardinality=6001215 in pipelines: 00(GETNEXT) @@ -172,20 +172,20 @@ group by 1, 2 having count(*) = 1 ---- DISTRIBUTEDPLAN Max Per-Host Resource Reservation: Memory=125.00MB Threads=7 -Per-Host Resource Estimates: Memory=253MB +Per-Host Resource Estimates: Memory=293MB F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 -| Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1 +| Per-Host Resources: mem-estimate=10.11MB mem-reservation=0B thread-reservation=1 PLAN-ROOT SINK | mem-estimate=0B mem-reservation=0B thread-reservation=0 | 08:EXCHANGE [UNPARTITIONED] -| mem-estimate=0B mem-reservation=0B thread-reservation=0 +| mem-estimate=10.11MB mem-reservation=0B thread-reservation=0 | tuple-ids=2 row-size=33B cardinality=4690314 | in pipelines: 07(GETNEXT) | F03:PLAN FRAGMENT [HASH(l_orderkey,o_orderstatus)] hosts=3 instances=3 -Per-Host Resources: mem-estimate=46.00MB mem-reservation=46.00MB thread-reservation=1 +Per-Host Resources: mem-estimate=56.11MB mem-reservation=46.00MB thread-reservation=1 07:AGGREGATE [FINALIZE] | output: count:merge(*) | group by: l_orderkey, o_orderstatus @@ -195,12 +195,12 @@ Per-Host Resources: mem-estimate=46.00MB mem-reservation=46.00MB thread-reservat | in pipelines: 07(GETNEXT), 00(OPEN) | 06:EXCHANGE [HASH(l_orderkey,o_orderstatus)] -| mem-estimate=0B mem-reservation=0B thread-reservation=0 +| mem-estimate=10.11MB mem-reservation=0B thread-reservation=0 | tuple-ids=2 row-size=33B cardinality=4690314 | in pipelines: 00(GETNEXT) | F02:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=3 -Per-Host Resources: mem-estimate=86.12MB mem-reservation=66.00MB thread-reservation=1 runtime-filters-memory=1.00MB +Per-Host Resources: mem-estimate=106.22MB mem-reservation=66.00MB thread-reservation=1 runtime-filters-memory=1.00MB 03:AGGREGATE [STREAMING] | output: count(*) | group by: l_orderkey, o_orderstatus @@ -217,7 +217,7 @@ Per-Host Resources: mem-estimate=86.12MB mem-reservation=66.00MB thread-reservat | in pipelines: 00(GETNEXT), 01(OPEN) | |--05:EXCHANGE [HASH(o_orderkey)] -| | mem-estimate=0B mem-reservation=0B thread-reservation=0 +| | mem-estimate=10.06MB mem-reservation=0B thread-reservation=0 | | tuple-ids=1 row-size=25B cardinality=1500000 | | in pipelines: 01(GETNEXT) | | @@ -234,19 +234,19 @@ Per-Host Resources: mem-estimate=86.12MB mem-reservation=66.00MB thread-reservat | in pipelines: 01(GETNEXT) | 04:EXCHANGE [HASH(l_orderkey)] -| mem-estimate=0B mem-reservation=0B thread-reservation=0 +| mem-estimate=10.04MB mem-reservation=0B thread-reservation=0 | tuple-ids=0 row-size=8B cardinality=6001215 | in pipelines: 00(GETNEXT) | F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 Per-Host Resources: mem-estimate=81.00MB mem-reservation=5.00MB thread-reservation=2 runtime-filters-memory=1.00MB 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM] - partitions=1/1 files=3 size=193.71MB + partitions=1/1 files=3 size=193.72MB runtime filters: RF000[bloom] -> l_orderkey stored statistics: - table: rows=6001215 size=193.71MB + table: rows=6001215 size=193.72MB columns: all - extrapolated-rows=disabled max-scan-range-rows=2141802 + extrapolated-rows=disabled max-scan-range-rows=2141530 mem-estimate=80.00MB mem-reservation=4.00MB thread-reservation=1 tuple-ids=0 row-size=8B cardinality=6001215 in pipelines: 00(GETNEXT) @@ -257,20 +257,20 @@ select distinct * from tpch_parquet.lineitem ---- DISTRIBUTEDPLAN Max Per-Host Resource Reservation: Memory=120.00MB Threads=4 -Per-Host Resource Estimates: Memory=3.31GB +Per-Host Resource Estimates: Memory=3.33GB F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 -| Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1 +| Per-Host Resources: mem-estimate=10.78MB mem-reservation=0B thread-reservation=1 PLAN-ROOT SINK | mem-estimate=0B mem-reservation=0B thread-reservation=0 | 04:EXCHANGE [UNPARTITIONED] -| mem-estimate=0B mem-reservation=0B thread-reservation=0 +| mem-estimate=10.78MB mem-reservation=0B thread-reservation=0 | tuple-ids=1 row-size=263B cardinality=6001215 | in pipelines: 03(GETNEXT) | F01:PLAN FRAGMENT [HASH(tpch_parquet.lineitem.l_orderkey,tpch_parquet.lineitem.l_partkey,tpch_parquet.lineitem.l_suppkey,tpch_parquet.lineitem.l_linenumber,tpch_parquet.lineitem.l_quantity,tpch_parquet.lineitem.l_extendedprice,tpch_parquet.lineitem.l_discount,tpch_parquet.lineitem.l_tax,tpch_parquet.lineitem.l_returnflag,tpch_parquet.lineitem.l_linestatus,tpch_parquet.lineitem.l_shipdate,tpch_parquet.lineitem.l_commitdate,tpch_parquet.lineitem.l_receiptdate,tpch_parquet.lineitem.l_shipinstruct,tpch_parquet.lineitem.l_shipmode,tpch_parquet.lineitem.l_comment)] hosts=3 instances=3 -Per-Host Resources: mem-estimate=1.62GB mem-reservation=46.00MB thread-reservation=1 +Per-Host Resources: mem-estimate=1.63GB mem-reservation=46.00MB thread-reservation=1 03:AGGREGATE [FINALIZE] | group by: tpch_parquet.lineitem.l_orderkey, tpch_parquet.lineitem.l_partkey, tpch_parquet.lineitem.l_suppkey, tpch_parquet.lineitem.l_linenumber, tpch_parquet.lineitem.l_quantity, tpch_parquet.lineitem.l_extendedprice, tpch_parquet.lineitem.l_discount, tpch_parquet.lineitem.l_tax, tpch_parquet.lineitem.l_returnflag, tpch_parquet.lineitem.l_linestatus, tpch_parquet.lineitem.l_shipdate, tpch_parquet.lineitem.l_commitdate, tpch_parquet.lineitem.l_receiptdate, tpch_parquet.lineitem.l_shipinstruct, tpch_parquet.lineitem.l_shipmode, tpch_parquet.lineitem.l_comment | mem-estimate=1.62GB mem-reservation=46.00MB spill-buffer=2.00MB thread-reservation=0 @@ -278,7 +278,7 @@ Per-Host Resources: mem-estimate=1.62GB mem-reservation=46.00MB thread-reservati | in pipelines: 03(GETNEXT), 00(OPEN) | 02:EXCHANGE [HASH(tpch_parquet.lineitem.l_orderkey,tpch_parquet.lineitem.l_partkey,tpch_parquet.lineitem.l_suppkey,tpch_parquet.lineitem.l_linenumber,tpch_parquet.lineitem.l_quantity,tpch_parquet.lineitem.l_extendedprice,tpch_parquet.lineitem.l_discount,tpch_parquet.lineitem.l_tax,tpch_parquet.lineitem.l_returnflag,tpch_parquet.lineitem.l_linestatus,tpch_parquet.lineitem.l_shipdate,tpch_parquet.lineitem.l_commitdate,tpch_parquet.lineitem.l_receiptdate,tpch_parquet.lineitem.l_shipinstruct,tpch_parquet.lineitem.l_shipmode,tpch_parquet.lineitem.l_comment)] -| mem-estimate=0B mem-reservation=0B thread-reservation=0 +| mem-estimate=10.78MB mem-reservation=0B thread-reservation=0 | tuple-ids=1 row-size=263B cardinality=6001215 | in pipelines: 00(GETNEXT) | @@ -291,11 +291,11 @@ Per-Host Resources: mem-estimate=1.69GB mem-reservation=74.00MB thread-reservati | in pipelines: 00(GETNEXT) | 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM] - partitions=1/1 files=3 size=193.71MB + partitions=1/1 files=3 size=193.72MB stored statistics: - table: rows=6001215 size=193.71MB + table: rows=6001215 size=193.72MB columns: all - extrapolated-rows=disabled max-scan-range-rows=2141802 + extrapolated-rows=disabled max-scan-range-rows=2141530 mem-estimate=80.00MB mem-reservation=40.00MB thread-reservation=1 tuple-ids=0 row-size=263B cardinality=6001215 in pipelines: 00(GETNEXT) @@ -307,20 +307,20 @@ from tpch_parquet.lineitem group by 1, 2 ---- DISTRIBUTEDPLAN Max Per-Host Resource Reservation: Memory=98.00MB Threads=4 -Per-Host Resource Estimates: Memory=483MB +Per-Host Resource Estimates: Memory=503MB F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 -| Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1 +| Per-Host Resources: mem-estimate=10.11MB mem-reservation=0B thread-reservation=1 PLAN-ROOT SINK | mem-estimate=0B mem-reservation=0B thread-reservation=0 | 04:EXCHANGE [UNPARTITIONED] -| mem-estimate=0B mem-reservation=0B thread-reservation=0 +| mem-estimate=10.11MB mem-reservation=0B thread-reservation=0 | tuple-ids=1 row-size=32B cardinality=6001215 | in pipelines: 03(GETNEXT) | F01:PLAN FRAGMENT [HASH(l_orderkey,l_partkey)] hosts=3 instances=3 -Per-Host Resources: mem-estimate=201.46MB mem-reservation=48.00MB thread-reservation=1 +Per-Host Resources: mem-estimate=211.56MB mem-reservation=48.00MB thread-reservation=1 03:AGGREGATE [FINALIZE] | output: group_concat:merge(l_linestatus, ',') | group by: l_orderkey, l_partkey @@ -329,7 +329,7 @@ Per-Host Resources: mem-estimate=201.46MB mem-reservation=48.00MB thread-reserva | in pipelines: 03(GETNEXT), 00(OPEN) | 02:EXCHANGE [HASH(l_orderkey,l_partkey)] -| mem-estimate=0B mem-reservation=0B thread-reservation=0 +| mem-estimate=10.11MB mem-reservation=0B thread-reservation=0 | tuple-ids=1 row-size=32B cardinality=6001215 | in pipelines: 00(GETNEXT) | @@ -343,11 +343,11 @@ Per-Host Resources: mem-estimate=281.46MB mem-reservation=50.00MB thread-reserva | in pipelines: 00(GETNEXT) | 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM] - partitions=1/1 files=3 size=193.71MB + partitions=1/1 files=3 size=193.72MB stored statistics: - table: rows=6001215 size=193.71MB + table: rows=6001215 size=193.72MB columns: all - extrapolated-rows=disabled max-scan-range-rows=2141802 + extrapolated-rows=disabled max-scan-range-rows=2141530 mem-estimate=80.00MB mem-reservation=16.00MB thread-reservation=1 tuple-ids=0 row-size=33B cardinality=6001215 in pipelines: 00(GETNEXT) @@ -362,17 +362,17 @@ Per-Host Resource Estimates: Memory=56MB Codegen disabled by planner F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 -| Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1 +| Per-Host Resources: mem-estimate=56.26KB mem-reservation=0B thread-reservation=1 PLAN-ROOT SINK | mem-estimate=0B mem-reservation=0B thread-reservation=0 | 04:EXCHANGE [UNPARTITIONED] -| mem-estimate=0B mem-reservation=0B thread-reservation=0 +| mem-estimate=56.26KB mem-reservation=0B thread-reservation=0 | tuple-ids=3,2 row-size=6B cardinality=7300 | in pipelines: 01(GETNEXT) | F01:PLAN FRAGMENT [HASH(int_col)] hosts=3 instances=3 -Per-Host Resources: mem-estimate=40.00MB mem-reservation=40.00MB thread-reservation=1 +Per-Host Resources: mem-estimate=40.04MB mem-reservation=40.00MB thread-reservation=1 02:ANALYTIC | functions: max(tinyint_col) | partition by: int_col @@ -387,7 +387,7 @@ Per-Host Resources: mem-estimate=40.00MB mem-reservation=40.00MB thread-reservat | in pipelines: 01(GETNEXT), 00(OPEN) | 03:EXCHANGE [HASH(int_col)] -| mem-estimate=0B mem-reservation=0B thread-reservation=0 +| mem-estimate=38.88KB mem-reservation=0B thread-reservation=0 | tuple-ids=0 row-size=5B cardinality=7300 | in pipelines: 00(GETNEXT) |
