This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 0f92742b45 Multi stage metrics (#13035)
0f92742b45 is described below
commit 0f92742b45aa9ac57439b68a63e4a9ef8d9d9d8e
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Fri Jun 7 09:37:34 2024 +0200
Multi stage metrics (#13035)
---
.../apache/pinot/common/metrics/ServerMeter.java | 41 ++++++++++++-
.../apache/pinot/common/metrics/ServerTimer.java | 31 +++++++++-
.../runtime/operator/MailboxSendOperator.java | 23 ++++++++
.../query/runtime/operator/MultiStageOperator.java | 69 ++++++++++++++++++++++
.../query/runtime/plan/MultiStageQueryStats.java | 24 ++++----
5 files changed, 176 insertions(+), 12 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index 516584950d..49d1c0c7e9 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -118,7 +118,46 @@ public enum ServerMeter implements AbstractMetrics.Meter {
LARGE_QUERY_RESPONSES_SENT("largeResponses", false),
TOTAL_THREAD_CPU_TIME_MILLIS("millis", false),
LARGE_QUERY_RESPONSE_SIZE_EXCEPTIONS("exceptions", false),
- STREAM_DATA_LOSS("streamDataLoss", false);
+ STREAM_DATA_LOSS("streamDataLoss", false),
+
+ // Multi-stage
+ /**
+ * Number of times the max number of rows in the hash table has been reached.
+ * It is increased at most one by one each time per stage.
+ * That means that if a stage has 10 workers and all of them reach the
limit, this will be increased by 1.
+ * But if a single query has 2 different join operators and each one reaches
the limit, this will be increased by 2.
+ */
+ HASH_JOIN_TIMES_MAX_ROWS_REACHED("times", true),
+ /**
+ * Number of times the max number of groups has been reached.
+ * It is increased at most one by one each time per stage.
+ * That means that if a stage has 10 workers and all of them reach the
limit, this will be increased by 1.
+ * But if a single query has 2 different aggregate operators and each one
reaches the limit, this will be increased
+ * by 2.
+ */
+ AGGREGATE_TIMES_NUM_GROUPS_LIMIT_REACHED("times", true),
+ /**
+ * The number of blocks that have been sent to the next stage without being
serialized.
+ * This is the sum of all blocks sent by all workers in the stage.
+ */
+ MULTI_STAGE_IN_MEMORY_MESSAGES("messages", true),
+ /**
+ * The number of blocks that have been sent to the next stage in serialized
format.
+ * This is the sum of all blocks sent by all workers in the stage.
+ */
+ MULTI_STAGE_RAW_MESSAGES("messages", true),
+ /**
+ * The number of bytes that have been sent to the next stage in serialized
format.
+ * This is the sum of all bytes sent by all workers in the stage.
+ */
+ MULTI_STAGE_RAW_BYTES("bytes", true),
+ /**
+ * Number of times the max number of rows in window has been reached.
+ * It is increased at most one by one each time per stage.
+ * That means that if a stage has 10 workers and all of them reach the
limit, this will be increased by 1.
+ * But if a single query has 2 different window operators and each one
reaches the limit, this will be increased by 2.
+ */
+ WINDOW_TIMES_MAX_ROWS_REACHED("times", true);
private final String _meterName;
private final String _unit;
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
index 31ee69428e..79e1eff8e0 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
@@ -54,7 +54,36 @@ public enum ServerTimer implements AbstractMetrics.Timer {
UPSERT_REMOVE_EXPIRED_PRIMARY_KEYS_TIME_MS("milliseconds", false,
"Total time taken to delete expired primary keys based on metadataTTL or
deletedKeysTTL"),
GRPC_QUERY_EXECUTION_MS("milliseconds", false, "Total execution time of a
successful query over gRPC"),
- UPSERT_SNAPSHOT_TIME_MS("milliseconds", false, "Total time taken to take
upsert table snapshot");
+ UPSERT_SNAPSHOT_TIME_MS("milliseconds", false, "Total time taken to take
upsert table snapshot"),
+
+ // Multi-stage
+ /**
+ * Time spent building the hash table for the join.
+ * This is the sum of all time spent by all workers in the stage.
+ */
+ HASH_JOIN_BUILD_TABLE_CPU_TIME_MS("millis", true),
+ /**
+ * Time spent serializing blocks into bytes to be sent to the next stage.
+ * This is the sum of all time spent by all workers in the stage.
+ */
+ MULTI_STAGE_SERIALIZATION_CPU_TIME_MS("millis", true),
+ /**
+ * Time spent deserializing bytes into blocks to be processed by the stage.
+ * This is the sum of all time spent by all workers in the stage.
+ */
+ MULTI_STAGE_DESERIALIZATION_CPU_TIME_MS("millis", true),
+ /**
+ * Time waiting on the receive mailbox for its parent operator to consume
the data.
+ * Remember that each stage may have several workers and each one will have
a receive mailbox for each worker it is
+ * reading from. This is the sum of all time waiting.
+ */
+ RECEIVE_DOWNSTREAM_WAIT_CPU_TIME_MS("millis", true),
+ /**
+ * Time waiting on the receive mailbox waiting for the child operator to
produce the data.
+ * Remember that each stage may have several workers and each one will have
a receive mailbox for each worker it is
+ * reading from. This is the sum of all time waiting.
+ */
+ RECEIVE_UPSTREAM_WAIT_CPU_TIME_MS("millis", true);
private final String _timerName;
private final boolean _global;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index fce214e7aa..1d638585ed 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -30,6 +30,7 @@ import javax.annotation.Nullable;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.SendingMailbox;
import org.apache.pinot.query.planner.logical.RexExpression;
@@ -146,6 +147,11 @@ public class MailboxSendOperator extends
MultiStageOperator {
updateEosBlock(block, _statMap);
// no need to check early terminate signal b/c the current block is
already EOS
sendTransferableBlock(block);
+ // After sending its own stats, the sending operator of the stage 1
has the complete view of all stats
+ // Therefore this is the only place we can update some of the metrics
like total seen rows or time spent.
+ if (_context.getStageId() == 1) {
+ updateMetrics(block);
+ }
} else {
if (sendTransferableBlock(block)) {
earlyTerminate();
@@ -196,6 +202,23 @@ public class MailboxSendOperator extends
MultiStageOperator {
_exchange.cancel(t);
}
+ private void updateMetrics(TransferableBlock block) {
+ ServerMetrics serverMetrics = ServerMetrics.get();
+ MultiStageQueryStats queryStats = block.getQueryStats();
+ if (queryStats == null) {
+ LOGGER.info("Query stats not found in the EOS block.");
+ } else {
+ for (MultiStageQueryStats.StageStats.Closed closed :
queryStats.getClosedStats()) {
+ closed.forEach((type, stats) -> {
+ type.updateServerMetrics(stats, serverMetrics);
+ });
+ }
+ queryStats.getCurrentStats().forEach((type, stats) -> {
+ type.updateServerMetrics(stats, serverMetrics);
+ });
+ }
+ }
+
public enum StatKey implements StatMap.Key {
EXECUTION_TIME_MS(StatMap.Type.LONG) {
@Override
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index 6d7ea779cc..50e68a47a6 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -24,6 +24,9 @@ import com.google.common.base.Stopwatch;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -174,6 +177,17 @@ public abstract class MultiStageOperator
response.mergeNumGroupsLimitReached(stats.getBoolean(AggregateOperator.StatKey.NUM_GROUPS_LIMIT_REACHED));
response.mergeMaxRowsInOperator(stats.getLong(AggregateOperator.StatKey.EMITTED_ROWS));
}
+
+ @Override
+ public void updateServerMetrics(StatMap<?> map, ServerMetrics
serverMetrics) {
+ super.updateServerMetrics(map, serverMetrics);
+ @SuppressWarnings("unchecked")
+ StatMap<AggregateOperator.StatKey> stats =
(StatMap<AggregateOperator.StatKey>) map;
+ boolean limitReached =
stats.getBoolean(AggregateOperator.StatKey.NUM_GROUPS_LIMIT_REACHED);
+ if (limitReached) {
+
serverMetrics.addMeteredGlobalValue(ServerMeter.AGGREGATE_TIMES_NUM_GROUPS_LIMIT_REACHED,
1);
+ }
+ }
},
FILTER(FilterOperator.StatKey.class) {
@Override
@@ -191,6 +205,19 @@ public abstract class MultiStageOperator
response.mergeMaxRowsInOperator(stats.getLong(HashJoinOperator.StatKey.EMITTED_ROWS));
response.mergeMaxRowsInJoinReached(stats.getBoolean(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN_REACHED));
}
+
+ @Override
+ public void updateServerMetrics(StatMap<?> map, ServerMetrics
serverMetrics) {
+ super.updateServerMetrics(map, serverMetrics);
+ @SuppressWarnings("unchecked")
+ StatMap<HashJoinOperator.StatKey> stats =
(StatMap<HashJoinOperator.StatKey>) map;
+ boolean maxRowsInJoinReached =
stats.getBoolean(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN_REACHED);
+ if (maxRowsInJoinReached) {
+
serverMetrics.addMeteredGlobalValue(ServerMeter.HASH_JOIN_TIMES_MAX_ROWS_REACHED,
1);
+ }
+
serverMetrics.addTimedValue(ServerTimer.HASH_JOIN_BUILD_TABLE_CPU_TIME_MS,
+
stats.getLong(HashJoinOperator.StatKey.TIME_BUILDING_HASH_TABLE_MS),
TimeUnit.MILLISECONDS);
+ }
},
INTERSECT(SetOperator.StatKey.class) {
@Override
@@ -228,6 +255,27 @@ public abstract class MultiStageOperator
StatMap<BaseMailboxReceiveOperator.StatKey> stats =
(StatMap<BaseMailboxReceiveOperator.StatKey>) map;
response.mergeMaxRowsInOperator(stats.getLong(BaseMailboxReceiveOperator.StatKey.EMITTED_ROWS));
}
+
+ @Override
+ public void updateServerMetrics(StatMap<?> map, ServerMetrics
serverMetrics) {
+ super.updateServerMetrics(map, serverMetrics);
+ @SuppressWarnings("unchecked")
+ StatMap<BaseMailboxReceiveOperator.StatKey> stats =
(StatMap<BaseMailboxReceiveOperator.StatKey>) map;
+
+
serverMetrics.addMeteredGlobalValue(ServerMeter.MULTI_STAGE_IN_MEMORY_MESSAGES,
+
stats.getInt(BaseMailboxReceiveOperator.StatKey.IN_MEMORY_MESSAGES));
+
serverMetrics.addMeteredGlobalValue(ServerMeter.MULTI_STAGE_RAW_MESSAGES,
+ stats.getInt(BaseMailboxReceiveOperator.StatKey.RAW_MESSAGES));
+ serverMetrics.addMeteredGlobalValue(ServerMeter.MULTI_STAGE_RAW_BYTES,
+
stats.getLong(BaseMailboxReceiveOperator.StatKey.DESERIALIZED_BYTES));
+
+
serverMetrics.addTimedValue(ServerTimer.MULTI_STAGE_DESERIALIZATION_CPU_TIME_MS,
+
stats.getLong(BaseMailboxReceiveOperator.StatKey.DESERIALIZATION_TIME_MS),
TimeUnit.MILLISECONDS);
+
serverMetrics.addTimedValue(ServerTimer.RECEIVE_DOWNSTREAM_WAIT_CPU_TIME_MS,
+
stats.getLong(BaseMailboxReceiveOperator.StatKey.DOWNSTREAM_WAIT_MS),
TimeUnit.MILLISECONDS);
+
serverMetrics.addTimedValue(ServerTimer.RECEIVE_UPSTREAM_WAIT_CPU_TIME_MS,
+
stats.getLong(BaseMailboxReceiveOperator.StatKey.UPSTREAM_WAIT_MS),
TimeUnit.MILLISECONDS);
+ }
},
MAILBOX_SEND(MailboxSendOperator.StatKey.class) {
@Override
@@ -236,6 +284,14 @@ public abstract class MultiStageOperator
StatMap<MailboxSendOperator.StatKey> stats =
(StatMap<MailboxSendOperator.StatKey>) map;
response.mergeMaxRowsInOperator(stats.getLong(MailboxSendOperator.StatKey.EMITTED_ROWS));
}
+
+ @Override
+ public void updateServerMetrics(StatMap<?> map, ServerMetrics
serverMetrics) {
+ @SuppressWarnings("unchecked")
+ StatMap<MailboxSendOperator.StatKey> stats =
(StatMap<MailboxSendOperator.StatKey>) map;
+
serverMetrics.addTimedValue(ServerTimer.MULTI_STAGE_SERIALIZATION_CPU_TIME_MS,
+ stats.getLong(MailboxSendOperator.StatKey.SERIALIZATION_TIME_MS),
TimeUnit.MILLISECONDS);
+ }
},
MINUS(SetOperator.StatKey.class) {
@Override
@@ -286,6 +342,15 @@ public abstract class MultiStageOperator
response.mergeMaxRowsInWindowReached(
stats.getBoolean(WindowAggregateOperator.StatKey.MAX_ROWS_IN_WINDOW_REACHED));
}
+
+ @Override
+ public void updateServerMetrics(StatMap<?> map, ServerMetrics
serverMetrics) {
+ @SuppressWarnings("unchecked")
+ StatMap<WindowAggregateOperator.StatKey> stats =
(StatMap<WindowAggregateOperator.StatKey>) map;
+ if
(stats.getBoolean(WindowAggregateOperator.StatKey.MAX_ROWS_IN_WINDOW_REACHED)) {
+
serverMetrics.addMeteredGlobalValue(ServerMeter.WINDOW_TIMES_MAX_ROWS_REACHED,
1);
+ }
+ }
},;
private final Class _statKeyClass;
@@ -311,5 +376,9 @@ public abstract class MultiStageOperator
* (compatible with {@link #getStatKeyClass()}). This is a way to avoid
casting in the caller.
*/
public abstract void mergeInto(BrokerResponseNativeV2 response, StatMap<?>
map);
+
+ public void updateServerMetrics(StatMap<?> map, ServerMetrics
serverMetrics) {
+ // Do nothing by default
+ }
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/MultiStageQueryStats.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/MultiStageQueryStats.java
index 712a0ab43a..b87b2a4085 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/MultiStageQueryStats.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/MultiStageQueryStats.java
@@ -284,14 +284,18 @@ public class MultiStageQueryStats {
myStats.merge(dis);
}
} catch (IOException ex) {
- LOGGER.warn("Error deserializing stats on stage {}. Considering the
new stats empty", i, ex);
+ LOGGER.warn("Error deserializing stats on stage " + i + ".
Considering the new stats empty", ex);
} catch (IllegalArgumentException | IllegalStateException ex) {
- LOGGER.warn("Error merging stats on stage {}. Ignoring the new
stats", i, ex);
+ LOGGER.warn("Error merging stats on stage " + i + ". Ignoring the
new stats", ex);
}
}
}
}
+ public List<StageStats.Closed> getClosedStats() {
+ return Collections.unmodifiableList(_closedStats);
+ }
+
public JsonNode asJson() {
ObjectNode node = JsonUtils.newObjectNode();
node.put("stage", _currentStageId);
@@ -418,6 +422,14 @@ public class MultiStageQueryStats {
return _operatorStats.size() - 1;
}
+ public void forEach(BiConsumer<MultiStageOperator.Type, StatMap<?>>
consumer) {
+ Iterator<MultiStageOperator.Type> typeIterator =
_operatorTypes.iterator();
+ Iterator<StatMap<?>> statIterator = _operatorStats.iterator();
+ while (typeIterator.hasNext()) {
+ consumer.accept(typeIterator.next(), statIterator.next());
+ }
+ }
+
public JsonNode asJson() {
ArrayNode json = JsonUtils.newArrayNode();
@@ -539,14 +551,6 @@ public class MultiStageQueryStats {
throws IOException {
return deserialize(input, input.readInt());
}
-
- public void forEach(BiConsumer<MultiStageOperator.Type, StatMap<?>>
consumer) {
- Iterator<MultiStageOperator.Type> typeIterator =
_operatorTypes.iterator();
- Iterator<StatMap<?>> statIterator = _operatorStats.iterator();
- while (typeIterator.hasNext()) {
- consumer.accept(typeIterator.next(), statIterator.next());
- }
- }
}
public static Closed deserialize(DataInput input, int numOperators)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]