This is an automated email from the ASF dual-hosted git repository.
mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new e1b17e075b [ASTERIXDB-3477,3474][RT] Profiling fixes and improvements
e1b17e075b is described below
commit e1b17e075b527db8c6f054f8157df7bb7db07561
Author: Ian Maxon <[email protected]>
AuthorDate: Tue Sep 17 18:37:48 2024 -0700
[ASTERIXDB-3477,3474][RT] Profiling fixes and improvements
- user model changes: yes
- storage format changes: no
- interface changes: no
Details:
The operator id was being improperly used as the key when updating
the min and max time. It needs to include the activity id to clearly
differentiate between the different activities of an operator.
Exchange time was previously being reported as the difference between
the open and close time of the connector, which is not right. Calculating
the actual time a connector is taking will require careful consideration
of each connector type and usage, such as 1-N vs N-M and pipelined vs.
materializing. Until then it is better to simply omit it, because it is
not typically a predominant factor in query time If it is, the sampled
partition profile still exists and will be useful to find those cases.
Previously, operators that were the last before a non 1-1 exchange would
not have time or cardinalities reported. This is now fixed.
All operators and connectors now report total cardinalities aggregated
across all partitions.
Ext-ref:MB-63566
Change-Id: I172f0044112777aec7200a0c6ae906711fcdc5f2
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18905
Reviewed-by: Ali Alsuliman <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
---
.../profile/full-scan/full-scan.3.regexjson | 7 +-
.../non-unary-subplan.3.regexjson | 7 +-
.../results/profile/plansleep/sleep.3.regexjson | 75 ++++++++-
.../results/profile/sleep/sleep.4.regexjson | 7 +-
.../results/profile/sleep/sleep.5.regexjson | 14 +-
.../LogicalOperatorPrettyPrintVisitorJson.java | 177 ++++++++++++++-------
.../runtime/base/ProfiledPushRuntime.java | 2 +-
.../meta/AlgebricksMetaOperatorDescriptor.java | 2 +-
.../apache/hyracks/api/dataflow/ITimedWriter.java | 2 +-
.../hyracks/api/dataflow/ProfiledFrameWriter.java | 25 +--
.../api/dataflow/ProfiledOperatorNodePushable.java | 4 +-
.../common/job/profiling/om/PartitionProfile.java | 15 +-
.../common/job/profiling/om/TaskProfile.java | 1 +
.../profiling/ProfilingPartitionWriterFactory.java | 36 ++---
.../hyracks/control/nc/work/StartTasksWork.java | 8 +-
15 files changed, 272 insertions(+), 110 deletions(-)
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson
b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson
index 03900f38a9..10578d0161 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson
@@ -24,6 +24,7 @@
},
"open-time": "R{[0-9]+}",
"close-time": "R{[0-9]+}",
+ "cardinality": "R{[0-9]+}",
"offset": "R{[0-9]+}",
"frame-times": [
0
@@ -35,7 +36,11 @@
{
"name": "R{.+}",
"run-time": "R{[0-9.]+}",
- "runtime-id": "R{.+}"
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
},
{
"name": "R{.+}",
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/non-unary-subplan/non-unary-subplan.3.regexjson
b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/non-unary-subplan/non-unary-subplan.3.regexjson
index e59f09565b..1936be711a 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/non-unary-subplan/non-unary-subplan.3.regexjson
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/non-unary-subplan/non-unary-subplan.3.regexjson
@@ -185,6 +185,7 @@
},
"open-time": "R{[0-9]+}",
"close-time": "R{[0-9]+}",
+ "cardinality": "R{[0-9]+}",
"offset": "R{[0-9]+}",
"frame-times": [
0
@@ -223,7 +224,11 @@
{
"name": "R{.+}",
"run-time": "R{[0-9.]+}",
- "runtime-id": "R{.+}"
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
},
{
"name": "R{.+}",
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/plansleep/sleep.3.regexjson
b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/plansleep/sleep.3.regexjson
index 111ded8c9d..4652e73970 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/plansleep/sleep.3.regexjson
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/plansleep/sleep.3.regexjson
@@ -31,6 +31,7 @@
"max-time": "R{[0-9.]+}",
"min-cardinality": 3,
"max-cardinality": 3,
+ "total-cardinality": 3,
"physical-operator": "STREAM_PROJECT",
"execution-mode": "PARTITIONED",
"optimizer-estimates": "R{.+}",
@@ -41,6 +42,9 @@
"runtime-id": "R{.+}",
"min-time": "R{[0-9.]+}",
"max-time": "R{[0-9.]+}",
+ "min-cardinality": "R{[0-9.]+}",
+ "max-cardinality": "R{[0-9.]+}",
+ "total-cardinality": "R{[0-9.]+}",
"physical-operator": "SORT_MERGE_EXCHANGE [$$49(ASC) ]",
"execution-mode": "PARTITIONED",
"optimizer-estimates": "R{.+}",
@@ -55,8 +59,23 @@
],
"operatorId": "1.5",
"runtime-id": "R{.+}",
- "min-time": "R{[0-9.]+}",
- "max-time": "R{[0-9.]+}",
+ "activity-stats": [
+ {
+ "name": "Sort (Run Generation)",
+ "id": "ANID:0",
+ "min-time": "R{[0-9.]+}",
+ "max-time": "R{[0-9.]+}"
+ },
+ {
+ "name": "Sort (Run Merge)",
+ "id": "ANID:1",
+ "min-time": "R{[0-9.]+}",
+ "max-time": "R{[0-9.]+}",
+ "min-cardinality": 3,
+ "max-cardinality": 3,
+ "total-cardinality": 3
+ }
+ ],
"physical-operator": "STABLE_SORT [$$49(ASC)]",
"execution-mode": "PARTITIONED",
"optimizer-estimates": "R{.+}",
@@ -81,6 +100,7 @@
"max-time": "R{[0-9.]+}",
"min-cardinality": 3,
"max-cardinality": 3,
+ "total-cardinality": 3,
"physical-operator": "STREAM_PROJECT",
"execution-mode": "PARTITIONED",
"optimizer-estimates": "R{.+}",
@@ -99,6 +119,7 @@
"max-time": "R{5.+}",
"min-cardinality": 3,
"max-cardinality": 3,
+ "total-cardinality": 3,
"physical-operator": "ASSIGN",
"execution-mode": "PARTITIONED",
"optimizer-estimates": "R{.+}",
@@ -115,6 +136,7 @@
"max-time": "R{[0-9.]+}",
"min-cardinality": 3,
"max-cardinality": 3,
+ "total-cardinality": 3,
"physical-operator": "STREAM_PROJECT",
"execution-mode": "PARTITIONED",
"optimizer-estimates": "R{.+}",
@@ -133,6 +155,7 @@
"max-time": "R{[0-9.]+}",
"min-cardinality": 3,
"max-cardinality": 3,
+ "total-cardinality": 3,
"physical-operator": "ASSIGN",
"execution-mode": "PARTITIONED",
"optimizer-estimates": "R{.+}",
@@ -177,10 +200,23 @@
],
"operatorId": "1.12",
"runtime-id": "R{.+}",
- "min-time": "R{[0-9.]+}",
- "max-time": "R{[0-9.]+}",
- "min-cardinality": 3,
- "max-cardinality": 3,
+ "activity-stats": [
+ {
+ "name": "Sort (Run
Generation)",
+ "id": "ANID:0",
+ "min-time": "R{[0-9.]+}",
+ "max-time": "R{[0-9.]+}"
+ },
+ {
+ "name": "Sort (Run Merge)",
+ "id": "ANID:1",
+ "min-time": "R{[0-9.]+}",
+ "max-time": "R{[0-9.]+}",
+ "min-cardinality": 3,
+ "max-cardinality": 3,
+ "total-cardinality": 3
+ }
+ ],
"physical-operator":
"SORT_GROUP_BY[$$56]",
"execution-mode":
"PARTITIONED",
"optimizer-estimates": "R{.+}",
@@ -191,6 +227,9 @@
"runtime-id": "R{.+}",
"min-time": "R{[0-9.]+}",
"max-time": "R{[0-9.]+}",
+ "min-cardinality": 3,
+ "max-cardinality": 3,
+ "total-cardinality": 3,
"physical-operator":
"HASH_PARTITION_EXCHANGE [$$56]",
"execution-mode":
"PARTITIONED",
"optimizer-estimates":
"R{.+}",
@@ -227,8 +266,23 @@
],
"operatorId": "1.14",
"runtime-id": "R{.+}",
- "min-time":
"R{[0-9.]+}",
- "max-time":
"R{[0-9.]+}",
+ "activity-stats": [
+ {
+ "name": "Sort (Run
Generation)",
+ "id": "ANID:0",
+ "min-time":
"R{[0-9.]+}",
+ "max-time":
"R{[0-9.]+}"
+ },
+ {
+ "name": "Sort (Run
Merge)",
+ "id": "ANID:1",
+ "min-time":
"R{[0-9.]+}",
+ "max-time":
"R{[0-9.]+}",
+ "min-cardinality":
3,
+ "max-cardinality":
3,
+
"total-cardinality": 3
+ }
+ ],
"physical-operator":
"SORT_GROUP_BY[$$50]",
"execution-mode":
"PARTITIONED",
"optimizer-estimates":
"R{.+}",
@@ -252,6 +306,7 @@
"max-time":
"R{[0-9.]+}",
"min-cardinality": 5,
"max-cardinality": 5,
+
"total-cardinality": 5,
"physical-operator": "STREAM_PROJECT",
"execution-mode": "PARTITIONED",
"optimizer-estimates": "R{.+}",
@@ -270,6 +325,7 @@
"max-time": "R{[0-9.]+}",
"min-cardinality": 5,
"max-cardinality": 5,
+
"total-cardinality": 5,
"physical-operator": "ASSIGN",
"execution-mode": "PARTITIONED",
"optimizer-estimates": "R{.+}",
@@ -283,6 +339,7 @@
"max-time": "R{[0-9.]+}",
"min-cardinality": 5,
"max-cardinality": 5,
+
"total-cardinality": 5,
"physical-operator": "STREAM_SELECT",
"execution-mode": "PARTITIONED",
"optimizer-estimates": "R{.+}",
@@ -298,6 +355,7 @@
"max-time": "R{[0-9.]+}",
"min-cardinality": 10,
"max-cardinality": 10,
+
"total-cardinality": 10,
"physical-operator": "STREAM_PROJECT",
"execution-mode": "PARTITIONED",
"optimizer-estimates": "R{.+}",
@@ -323,6 +381,7 @@
"max-time": "R{[0-9.]+}",
"min-cardinality": 10,
"max-cardinality": 10,
+
"total-cardinality": 10,
"physical-operator": "DATASOURCE_SCAN",
"execution-mode": "PARTITIONED",
"optimizer-estimates": "R{.+}",
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.4.regexjson
b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.4.regexjson
index e3e76477d7..2f74d349f9 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.4.regexjson
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.4.regexjson
@@ -24,6 +24,7 @@
},
"open-time": "R{[0-9]+}",
"close-time": "R{[0-9]+}",
+ "cardinality": "R{[0-9]+}",
"offset": "R{[0-9]+}",
"frame-times": [
0
@@ -35,7 +36,11 @@
{
"name": "R{.+}",
"run-time": "R{[0-9.]+}",
- "runtime-id": "R{.+}"
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
}
]
},
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.5.regexjson
b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.5.regexjson
index 98d7930438..aef3a826f5 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.5.regexjson
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.5.regexjson
@@ -24,6 +24,7 @@
},
"open-time": "R{[0-9]+}",
"close-time": "R{[0-9]+}",
+ "cardinality": "R{[0-9]+}",
"offset": "R{[0-9]+}",
"frame-times": [
0
@@ -35,7 +36,11 @@
{
"name": "R{.+}",
"run-time": "R{[0-9.]+}",
- "runtime-id": "R{.+}"
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
}
]
},
@@ -124,6 +129,7 @@
},
"open-time": "R{[0-9]+}",
"close-time": "R{[0-9]+}",
+ "cardinality": "R{[0-9]+}",
"offset": "R{[0-9]+}",
"frame-times": [
0
@@ -135,7 +141,11 @@
{
"name": "R{.+}",
"run-time": "R{[0-9.]+}",
- "runtime-id": "R{.+}"
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
}
]
},
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
index 36c33e611f..5c11e69ece 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
@@ -26,6 +26,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -104,9 +105,27 @@ public class LogicalOperatorPrettyPrintVisitorJson extends
AbstractLogicalOperat
private static final String CONDITION_FIELD = "condition";
private static final String MISSING_VALUE_FIELD = "missing-value";
private static final String OPTIMIZER_ESTIMATES = "optimizer-estimates";
+
+ private static final String MIN_TIME = "min-time";
+
+ private static final String MAX_TIME = "max-time";
+
+ private static final String MIN_CARDINALITY = "min-cardinality";
+
+ private static final String MAX_CARDINALITY = "max-cardinality";
+
+ private static final String TOTAL_CARDINALITY = "total-cardinality";
+ private static final String NAME = "name";
+ private static final String ID = "id";
+ private static final String RUNTIME_ID = "runtime-id";
+
+ private static final String JOBLETS = "joblets";
+ private static final String CDID = "CDID";
+ private static final String ANID_START = "ANID:";
private final Map<AbstractLogicalOperator, String> operatorIdentity = new
HashMap<>();
private Map<Object, String> log2odid = Collections.emptyMap();
private Map<String, OperatorProfile> profile = Collections.emptyMap();
+
private final IdCounter idCounter = new IdCounter();
private final JsonGenerator jsonGenerator;
@@ -154,7 +173,7 @@ public class LogicalOperatorPrettyPrintVisitorJson extends
AbstractLogicalOperat
}
}
- private class ExtendedActivityId {
+ private static class ExtendedActivityId {
private final OperatorDescriptorId odId;
private final int id;
private final int microId;
@@ -162,7 +181,7 @@ public class LogicalOperatorPrettyPrintVisitorJson extends
AbstractLogicalOperat
private final int subId;
ExtendedActivityId(String str) {
- if (str.startsWith("ANID:")) {
+ if (str.startsWith(ANID_START)) {
str = str.substring(5);
int idIdx = str.lastIndexOf(':');
this.odId = OperatorDescriptorId.parse(str.substring(0,
idIdx));
@@ -201,11 +220,15 @@ public class LogicalOperatorPrettyPrintVisitorJson
extends AbstractLogicalOperat
@Override
public String toString() {
- return "ANID:" + odId + ":" + getLocalId();
+ return "ANID:" + odId + ":" + id + "." + getLocalId();
+ }
+
+ private String getActivityAndLocalId() {
+ return "ANID:" + id + (!getLocalId().isEmpty() ? "." +
getLocalId() : "");
}
private void catenateId(StringBuilder sb, int i) {
- if (sb.length() == 0) {
+ if (sb.isEmpty()) {
sb.append(i);
return;
}
@@ -213,9 +236,8 @@ public class LogicalOperatorPrettyPrintVisitorJson extends
AbstractLogicalOperat
sb.append(i);
}
- public String getLocalId() {
+ private String getLocalId() {
StringBuilder sb = new StringBuilder();
- catenateId(sb, odId.getId());
if (microId > 0) {
catenateId(sb, microId);
}
@@ -227,63 +249,83 @@ public class LogicalOperatorPrettyPrintVisitorJson
extends AbstractLogicalOperat
}
}
+ private static class MinMax<T extends Comparable<T>> {
+ T min;
+ T max;
+
+ public MinMax(T initial) {
+ this.min = initial;
+ this.max = initial;
+ }
+
+ public void update(T val) {
+ min = min.compareTo(val) > 0 ? val : min;
+ max = max.compareTo(val) < 0 ? val : max;
+ }
+
+ }
+
private class OperatorProfile {
- Map<String, Pair<Double, Double>> activityTimes;
- Map<String, Pair<Long, Long>> activityCards;
+
+ Map<String, String> activityNames;
+ Map<String, MinMax<Double>> activityTimes;
+ Map<String, MinMax<Long>> activityCards;
+ Map<String, Long> activityCardTotal;
OperatorProfile() {
+ activityNames = new HashMap<>();
activityTimes = new HashMap<>();
activityCards = new HashMap<>();
+ activityCardTotal = new HashMap<>();
}
- void updateOperator(String extendedOpId, double time, long
cardinality) {
- updateMinMax(time, extendedOpId, activityTimes);
- if (cardinality > 0) {
- updateMinMax(cardinality, extendedOpId, activityCards);
- }
+ void updateOperator(String extendedOpId, String name, double time,
long cardinality) {
+ updateOperator(extendedOpId, name, time);
+ updateMinMax(cardinality, extendedOpId, activityCards);
+ activityCardTotal.compute(extendedOpId, (id, total) -> total ==
null ? cardinality : total + cardinality);
}
- void updateOperator(String extendedOpId, double time) {
+ void updateOperator(String extendedOpId, String name, double time) {
+ activityNames.put(extendedOpId, name);
updateMinMax(time, extendedOpId, activityTimes);
}
- private <T extends Comparable<T>> void updateMinMax(T comp, String id,
Map<String, Pair<T, T>> opMap) {
- Pair<T, T> times = opMap.computeIfAbsent(id, i -> new Pair(comp,
comp));
- if (times.getFirst().compareTo(comp) > 0) {
- times.setFirst(comp);
- }
- if (times.getSecond().compareTo(comp) < 0) {
- times.setSecond(comp);
- }
+ private <T extends Comparable<T>> void updateMinMax(T comp, String id,
Map<String, MinMax<T>> opMap) {
+ MinMax<T> stat = opMap.computeIfAbsent(id, i -> new
MinMax<>(comp));
+ stat.update(comp);
}
}
- private ExtendedActivityId acIdFromName(String name) {
+ private Pair<ExtendedActivityId, String> splitAcId(String name) {
String[] parts = name.split(" - ");
- return new ExtendedActivityId(parts[0]);
+ return new Pair<>(new ExtendedActivityId(parts[0]), parts[1]);
}
Map<String, OperatorProfile> processProfile(ObjectNode profile) {
Map<String, OperatorProfile> profiledOps = new HashMap<>();
- for (JsonNode joblet : profile.get("joblets")) {
+ for (JsonNode joblet : profile.get(JOBLETS)) {
for (JsonNode task : joblet.get("tasks")) {
for (JsonNode counters : task.get("counters")) {
- OperatorProfile info =
profiledOps.computeIfAbsent(counters.get("runtime-id").asText(),
- i -> new OperatorProfile());
+ if (counters.get(RUNTIME_ID).asText().contains(CDID)) {
+ continue;
+ }
+ OperatorProfile info =
+
profiledOps.computeIfAbsent(counters.get(RUNTIME_ID).asText(), i -> new
OperatorProfile());
+ Pair<ExtendedActivityId, String> identities =
splitAcId(counters.get(NAME).asText());
JsonNode card = counters.get("cardinality-out");
if (card != null) {
-
info.updateOperator(acIdFromName(counters.get("name").asText()).getLocalId(),
- counters.get("run-time").asDouble(),
counters.get("cardinality-out").asLong(-1));
+
info.updateOperator(identities.first.getActivityAndLocalId(), identities.second,
+ counters.get("run-time").asDouble(),
card.asLong(0));
+ } else {
+
info.updateOperator(identities.first.getActivityAndLocalId(), identities.second,
+ counters.get("run-time").asDouble());
}
-
info.updateOperator(acIdFromName(counters.get("name").asText()).getLocalId(),
- counters.get("run-time").asDouble());
}
for (JsonNode partition : task.get("partition-send-profile")) {
String id =
partition.get("partition-id").get("connector-id").asText();
OperatorProfile info = profiledOps.computeIfAbsent(id, i
-> new OperatorProfile());
- //CDIDs are unique
- info.updateOperator("0",
- partition.get("close-time").asDouble() -
partition.get("open-time").asDouble());
+ //TODO: connector times need to be calculated accurately,
until then they won't be included
+ info.updateOperator("0", "", 0,
partition.get("cardinality").asLong(0));
}
}
}
@@ -350,6 +392,45 @@ public class LogicalOperatorPrettyPrintVisitorJson extends
AbstractLogicalOperat
}
}
+ private void printActivityStats(Optional<MinMax<Double>> time,
Optional<MinMax<Long>> card,
+ Optional<Long> totalCard) throws IOException {
+ if (time.isPresent()) {
+ jsonGenerator.writeNumberField(MIN_TIME, time.get().min);
+ jsonGenerator.writeNumberField(MAX_TIME, time.get().max);
+ }
+ if (card.isPresent()) {
+ jsonGenerator.writeNumberField(MIN_CARDINALITY, card.get().min);
+ jsonGenerator.writeNumberField(MAX_CARDINALITY, card.get().max);
+ }
+ if (totalCard.isPresent()) {
+ jsonGenerator.writeNumberField(TOTAL_CARDINALITY, totalCard.get());
+ }
+ }
+
+ private void printOperatorStats(OperatorProfile info) throws IOException {
+ if (info.activityTimes.size() == 1) {
+ Optional<MinMax<Double>> times =
info.activityTimes.values().stream().findFirst();
+ Optional<MinMax<Long>> cards =
info.activityCards.values().stream().findFirst();
+ Optional<Long> total =
info.activityCardTotal.values().stream().findFirst();
+ printActivityStats(times, cards, total);
+ } else if (info.activityTimes.size() > 1) {
+ jsonGenerator.writeArrayFieldStart("activity-stats");
+ for (String acId : info.activityTimes.keySet()) {
+ jsonGenerator.writeStartObject();
+ String prettyName = info.activityNames.get(acId);
+ if (prettyName != null) {
+ jsonGenerator.writeStringField(NAME, prettyName);
+ }
+ jsonGenerator.writeStringField("id", acId);
+
printActivityStats(Optional.ofNullable(info.activityTimes.get(acId)),
+ Optional.ofNullable(info.activityCards.get(acId)),
+ Optional.ofNullable(info.activityCardTotal.get(acId)));
+ jsonGenerator.writeEndObject();
+ }
+ jsonGenerator.writeEndArray();
+ }
+ }
+
private void printOperatorImpl(AbstractLogicalOperator op, boolean
printInputs, boolean printOptimizerEstimates)
throws AlgebricksException {
try {
@@ -358,34 +439,10 @@ public class LogicalOperatorPrettyPrintVisitorJson
extends AbstractLogicalOperat
jsonGenerator.writeStringField("operatorId",
idCounter.printOperatorId(op));
String od = log2odid.get(op);
if (od != null) {
- jsonGenerator.writeStringField("runtime-id", od);
+ jsonGenerator.writeStringField(RUNTIME_ID, od);
OperatorProfile info = profile.get(od);
if (info != null) {
- if (info.activityTimes.size() == 1) {
- Pair<Double, Double> minMax =
info.activityTimes.values().iterator().next();
- jsonGenerator.writeNumberField("min-time",
minMax.first);
- jsonGenerator.writeNumberField("max-time",
minMax.second);
- if (info.activityCards.size() > 0) {
- Pair<Long, Long> minMaxCard =
info.activityCards.values().iterator().next();
- jsonGenerator.writeNumberField("min-cardinality",
minMaxCard.first);
- jsonGenerator.writeNumberField("max-cardinality",
minMaxCard.second);
- }
- } else {
- jsonGenerator.writeObjectFieldStart("times");
- for (String acId : info.activityTimes.keySet()) {
- jsonGenerator.writeObjectFieldStart(acId);
- jsonGenerator.writeNumberField("min-time",
info.activityTimes.get(acId).first);
- jsonGenerator.writeNumberField("max-time",
info.activityTimes.get(acId).second);
- Pair<Long, Long> cards =
info.activityCards.get(acId);
- if (cards != null) {
-
jsonGenerator.writeNumberField("min-cardinality",
info.activityCards.get(acId).first);
-
jsonGenerator.writeNumberField("max-cardinality",
info.activityCards.get(acId).second);
- }
- jsonGenerator.writeEndObject();
- }
- jsonGenerator.writeEndObject();
- }
-
+ printOperatorStats(info);
}
}
IPhysicalOperator pOp = op.getPhysicalOperator();
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java
index 354f172c1c..bf4533fb06 100644
---
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java
@@ -70,7 +70,7 @@ public class ProfiledPushRuntime extends ProfiledFrameWriter
implements IPushRun
public void setOutputFrameWriter(int index, IFrameWriter writer,
RecordDescriptor recordDesc) {
if (writer instanceof ITimedWriter) {
ITimedWriter wrapper = (ITimedWriter) writer;
- wrapper.setUpstreamStats(stats);
+ wrapper.setInputStats(stats);
outputs.put(index, wrapper);
}
wrapped.setOutputFrameWriter(index, writer, recordDesc);
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index 2a3fa7ee19..f6d9cd15b1 100644
---
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -265,7 +265,7 @@ public class AlgebricksMetaOperatorDescriptor extends
AbstractSingleActivityOper
}
@Override
- public void setUpstreamStats(IOperatorStats stats) {
+ public void setInputStats(IOperatorStats stats) {
parentStats = stats;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ITimedWriter.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ITimedWriter.java
index 7b0f8c8a0c..e4f990f64d 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ITimedWriter.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ITimedWriter.java
@@ -22,7 +22,7 @@ import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.job.profiling.IOperatorStats;
public interface ITimedWriter extends IFrameWriter {
- void setUpstreamStats(IOperatorStats stats);
+ void setInputStats(IOperatorStats stats);
long getTotalTime();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java
index cfb0e7b0e5..fb4ed3ce1d 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java
@@ -41,11 +41,12 @@ public class ProfiledFrameWriter implements ITimedWriter {
// The downstream data consumer of this writer.
private final IFrameWriter writer;
- protected IOperatorStats upstreamStats = NoOpOperatorStats.INSTANCE;
+ protected IOperatorStats inputStats = NoOpOperatorStats.INSTANCE;
private int minSz = Integer.MAX_VALUE;
private int maxSz = -1;
private long avgSz;
- private ICounter totalTime;
+
+ private final ICounter totalTime;
public ProfiledFrameWriter(IFrameWriter writer) {
this.writer = writer;
@@ -53,8 +54,8 @@ public class ProfiledFrameWriter implements ITimedWriter {
}
@Override
- public void setUpstreamStats(IOperatorStats stats) {
- this.upstreamStats = stats;
+ public void setInputStats(IOperatorStats stats) {
+ this.inputStats = stats;
}
public static void timeMethod(HyracksRunnable r, ICounter c) throws
HyracksDataException {
@@ -78,14 +79,14 @@ public class ProfiledFrameWriter implements ITimedWriter {
}
@Override
- public final void open() throws HyracksDataException {
+ public void open() throws HyracksDataException {
timeMethod(writer::open, totalTime);
}
private void updateTupleStats(ByteBuffer buffer) {
int tupleCountOffset = FrameHelper.getTupleCountOffset(buffer.limit());
int tupleCount = IntSerDeUtils.getInt(buffer.array(),
tupleCountOffset);
- ICounter tupleCounter = upstreamStats.getTupleCounter();
+ ICounter tupleCounter = inputStats.getTupleCounter();
long prevCount = tupleCounter.get();
for (int i = 0; i < tupleCount; i++) {
int tupleLen = getTupleLength(i, tupleCountOffset, buffer);
@@ -99,25 +100,25 @@ public class ProfiledFrameWriter implements ITimedWriter {
avgSz = (prev + tupleLen) / (prevCount + 1);
prevCount++;
}
- upstreamStats.getMaxTupleSz().set(maxSz);
- upstreamStats.getMinTupleSz().set(minSz);
- upstreamStats.getAverageTupleSz().set(avgSz);
+ inputStats.getMaxTupleSz().set(maxSz);
+ inputStats.getMinTupleSz().set(minSz);
+ inputStats.getAverageTupleSz().set(avgSz);
tupleCounter.update(tupleCount);
}
@Override
- public final void nextFrame(ByteBuffer buffer) throws HyracksDataException
{
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
updateTupleStats(buffer);
timeMethod(writer::nextFrame, buffer);
}
@Override
- public final void flush() throws HyracksDataException {
+ public void flush() throws HyracksDataException {
timeMethod(writer::flush, totalTime);
}
@Override
- public final void fail() throws HyracksDataException {
+ public void fail() throws HyracksDataException {
timeMethod(writer::fail, totalTime);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java
index 1984d8e07f..cb188c0126 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java
@@ -77,9 +77,9 @@ public class ProfiledOperatorNodePushable implements
IOperatorNodePushable, ISta
if (writer instanceof ITimedWriter) {
ITimedWriter wrapper = (ITimedWriter) writer;
if (op instanceof ISelfProfilingNodePushable) {
- wrapper.setUpstreamStats(((ISelfProfilingNodePushable)
op).getStats());
+ wrapper.setInputStats(((ISelfProfilingNodePushable)
op).getStats());
} else {
- wrapper.setUpstreamStats(stats);
+ wrapper.setInputStats(stats);
}
outputs.put(index, wrapper);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/PartitionProfile.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/PartitionProfile.java
index 9552a26181..9a57e30c72 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/PartitionProfile.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/PartitionProfile.java
@@ -35,6 +35,8 @@ public class PartitionProfile implements IWritable,
Serializable {
private long openTime;
private long closeTime;
+ private long totalTime;
+ private long card;
private MultiResolutionEventProfiler mrep;
@@ -48,10 +50,13 @@ public class PartitionProfile implements IWritable,
Serializable {
}
- public PartitionProfile(PartitionId pid, long openTime, long closeTime,
MultiResolutionEventProfiler mrep) {
+ public PartitionProfile(PartitionId pid, long openTime, long closeTime,
long totalTime, long card,
+ MultiResolutionEventProfiler mrep) {
this.pid = pid;
this.openTime = openTime;
this.closeTime = closeTime;
+ this.totalTime = totalTime;
+ this.card = card;
this.mrep = mrep;
}
@@ -67,6 +72,10 @@ public class PartitionProfile implements IWritable,
Serializable {
return closeTime;
}
+ public long getCardinality() {
+ return card;
+ }
+
public MultiResolutionEventProfiler getSamples() {
return mrep;
}
@@ -75,6 +84,8 @@ public class PartitionProfile implements IWritable,
Serializable {
public void writeFields(DataOutput output) throws IOException {
output.writeLong(closeTime);
output.writeLong(openTime);
+ output.writeLong(totalTime);
+ output.writeLong(card);
mrep.writeFields(output);
pid.writeFields(output);
}
@@ -83,6 +94,8 @@ public class PartitionProfile implements IWritable,
Serializable {
public void readFields(DataInput input) throws IOException {
closeTime = input.readLong();
openTime = input.readLong();
+ totalTime = input.readLong();
+ card = input.readLong();
mrep = MultiResolutionEventProfiler.create(input);
pid = PartitionId.create(input);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
index 84376f69bd..b66be1e8d4 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
@@ -103,6 +103,7 @@ public class TaskProfile extends AbstractProfile {
ppObj.set("partition-id", pidObj);
ppObj.put("open-time", pp.getOpenTime());
ppObj.put("close-time", pp.getCloseTime());
+ ppObj.put("cardinality", pp.getCardinality());
MultiResolutionEventProfiler samples = pp.getSamples();
ppObj.put("offset", samples.getOffset());
int resolution = samples.getResolution();
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
index e51d2bdaf2..65f3f313a4 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
@@ -24,7 +24,10 @@ import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.IPartitionWriterFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
+import org.apache.hyracks.api.dataflow.ProfiledFrameWriter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.profiling.IOperatorStats;
+import org.apache.hyracks.api.job.profiling.OperatorStats;
import org.apache.hyracks.api.partitions.PartitionId;
import
org.apache.hyracks.control.common.job.profiling.counters.MultiResolutionEventProfiler;
import org.apache.hyracks.control.common.job.profiling.om.PartitionProfile;
@@ -53,46 +56,43 @@ public class ProfilingPartitionWriterFactory implements
IPartitionWriterFactory
public IFrameWriter createFrameWriter(final int receiverIndex) throws
HyracksDataException {
final IFrameWriter writer = new
ConnectorSenderProfilingFrameWriter(ctx,
delegate.createFrameWriter(receiverIndex),
cd.getConnectorId(), senderIndex, receiverIndex);
- return new IFrameWriter() {
+ return new ProfiledFrameWriter(writer) {
private long openTime;
-
private long closeTime;
MultiResolutionEventProfiler mrep = new
MultiResolutionEventProfiler(N_SAMPLES);
+ IOperatorStats stats = new OperatorStats(cd.getDisplayName(),
cd.getConnectorId().toString());
+
@Override
public void open() throws HyracksDataException {
+ super.setInputStats(stats);
openTime = System.currentTimeMillis();
- writer.open();
+ super.open();
}
@Override
public void nextFrame(ByteBuffer buffer) throws
HyracksDataException {
mrep.reportEvent();
- writer.nextFrame(buffer);
- }
-
- @Override
- public void fail() throws HyracksDataException {
- writer.fail();
+ super.nextFrame(buffer);
}
@Override
public void close() throws HyracksDataException {
closeTime = System.currentTimeMillis();
+ long ownTime = getTotalTime();
+ if (stats != null) {
+ stats.getTimeCounter().set(ownTime);
+ }
try {
- ((Task) ctx).setPartitionSendProfile(
- new PartitionProfile(new
PartitionId(ctx.getJobletContext().getJobId(), cd.getConnectorId(),
- senderIndex, receiverIndex), openTime,
closeTime, mrep));
+ ((Task) ctx).setPartitionSendProfile(new PartitionProfile(
+ new PartitionId(ctx.getJobletContext().getJobId(),
cd.getConnectorId(), senderIndex,
+ receiverIndex),
+ openTime, closeTime, super.getTotalTime(),
stats.getTupleCounter().get(), mrep));
} finally {
- writer.close();
+ super.close();
}
}
-
- @Override
- public void flush() throws HyracksDataException {
- writer.flush();
- }
};
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index 6225d4cc49..410cb0090f 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -38,6 +38,7 @@ import org.apache.hyracks.api.dataflow.EnforceFrameWriter;
import org.apache.hyracks.api.dataflow.IActivity;
import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.ProfiledFrameWriter;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.dataflow.TaskId;
import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy;
@@ -182,7 +183,12 @@ public class StartTasksWork extends AbstractWork {
LOGGER.trace("input: {}: {}", i,
conn.getConnectorId());
IFrameWriter writer = conn.createPartitioner(task,
recordDesc, pwFactory, partition,
partitionCount,
td.getOutputPartitionCounts()[i]);
- writer = (enforce && !profile) ?
EnforceFrameWriter.enforce(writer) : writer;
+ if (profile) {
+ //needed to propagate cardinality to the last
operator before an exchange
+ writer = new ProfiledFrameWriter(writer);
+ } else {
+ writer = enforce ?
EnforceFrameWriter.enforce(writer) : writer;
+ }
operator.setOutputFrameWriter(i, writer, recordDesc);
}
}