huijunwu commented on a change in pull request #2821: Update Dhalion dependency
version
URL: https://github.com/apache/incubator-heron/pull/2821#discussion_r178399304
##########
File path:
heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/SlowInstanceDiagnoser.java
##########
@@ -14,69 +14,74 @@
package com.twitter.heron.healthmgr.diagnosers;
-import java.util.List;
-import java.util.Map;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.logging.Logger;
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.diagnoser.Diagnosis;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
+import com.microsoft.dhalion.core.Diagnosis;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.core.Symptom;
+import com.microsoft.dhalion.core.SymptomsTable;
-import com.twitter.heron.healthmgr.common.ComponentMetricsHelper;
-import com.twitter.heron.healthmgr.common.MetricsStats;
-
-import static
com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.DIAGNOSIS_SLOW_INSTANCE;
-import static
com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.SYMPTOM_SLOW_INSTANCE;
-import static
com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
-import static
com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BUFFER_SIZE;
+import static
com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_COMP_BACK_PRESSURE;
+import static
com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_PROCESSING_RATE_SKEW;
+import static
com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_WAIT_Q_SIZE_SKEW;
+import static
com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType.DIAGNOSIS_SLOW_INSTANCE;
+import static
com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
public class SlowInstanceDiagnoser extends BaseDiagnoser {
private static final Logger LOG =
Logger.getLogger(SlowInstanceDiagnoser.class.getName());
@Override
- public Diagnosis diagnose(List<Symptom> symptoms) {
- List<Symptom> bpSymptoms = getBackPressureSymptoms(symptoms);
- Map<String, ComponentMetrics> processingRateSkewComponents =
- getProcessingRateSkewComponents(symptoms);
- Map<String, ComponentMetrics> waitQDisparityComponents =
getWaitQDisparityComponents(symptoms);
+ public Collection<Diagnosis> diagnose(Collection<Symptom> symptoms) {
+ Collection<Diagnosis> diagnoses = new ArrayList<>();
+ SymptomsTable symptomsTable = SymptomsTable.of(symptoms);
- if (bpSymptoms.isEmpty() || waitQDisparityComponents.isEmpty()
- || !processingRateSkewComponents.isEmpty()) {
- // Since there is no back pressure or disparate wait count or similar
- // execution count, no action is needed
- return null;
- } else if (bpSymptoms.size() > 1) {
+ SymptomsTable bp = symptomsTable.type(SYMPTOM_COMP_BACK_PRESSURE.text());
+ if (bp.size() > 1) {
// TODO handle cases where multiple detectors create back pressure
symptom
throw new IllegalStateException("Multiple back-pressure symptoms case");
}
- ComponentMetrics bpMetrics = bpSymptoms.iterator().next().getComponent();
+ if (bp.size() == 0) {
+ return diagnoses;
+ }
+ String bpComponent = bp.first().assignments().iterator().next();
- // verify wait Q disparity and back pressure for the same component exists
- ComponentMetrics pendingBufferMetrics =
waitQDisparityComponents.get(bpMetrics.getName());
- if (pendingBufferMetrics == null) {
- // no wait Q disparity for the component with back pressure. There is no
slow instance
- return null;
+ SymptomsTable processingRateSkew =
symptomsTable.type(SYMPTOM_PROCESSING_RATE_SKEW.text());
+ SymptomsTable waitQSkew =
symptomsTable.type(SYMPTOM_WAIT_Q_SIZE_SKEW.text());
+ // verify wait Q disparity, similar processing rates and back pressure for
the same component
+ // exist
+ if (waitQSkew.assignment(bpComponent).size() == 0
+ || processingRateSkew.assignment(bpComponent).size() > 0) {
+ // TODO in a short window rate skew could exist
+ return diagnoses;
}
- ComponentMetrics mergedData = ComponentMetrics.merge(bpMetrics,
pendingBufferMetrics);
- ComponentMetricsHelper compStats = new ComponentMetricsHelper(mergedData);
- compStats.computeBpStats();
- MetricsStats bufferStats =
compStats.computeMinMaxStats(METRIC_BUFFER_SIZE);
+ Collection<String> assignments = new ArrayList<>();
+
+ Instant newest = context.checkpoint();
+ Instant oldest = context.previousCheckpoint();
+ MeasurementsTable measurements = context.measurements()
+ .between(oldest, newest)
+ .component(bpComponent);
- Symptom resultSymptom = null;
- for (InstanceMetrics boltMetrics : compStats.getBoltsWithBackpressure()) {
- double bufferSize =
boltMetrics.getMetricValueSum(METRIC_BUFFER_SIZE.text());
- double bpValue =
boltMetrics.getMetricValueSum(METRIC_BACK_PRESSURE.text());
- if (bufferStats.getMetricMax() < bufferSize * 2) {
- LOG.info(String.format("SLOW: %s back-pressure(%s) and high buffer
size: %s "
+ for (String instance : measurements.uniqueInstances()) {
+ MeasurementsTable instanceMeasurements = measurements.instance(instance);
+ double waitQSize =
instanceMeasurements.type(METRIC_WAIT_Q_SIZE.text()).mean();
+ if (measurements.type(METRIC_WAIT_Q_SIZE.text()).max() < waitQSize * 2) {
+ assignments.add(instance);
+ LOG.info(String.format("SLOW: %s back-pressure and high buffer size:
%s "
+ "and similar processing rates",
- boltMetrics.getName(), bpValue, bufferSize));
- resultSymptom = new Symptom(SYMPTOM_SLOW_INSTANCE.text(), mergedData);
+ instance, waitQSize));
}
}
- return resultSymptom != null
- ? new Diagnosis(DIAGNOSIS_SLOW_INSTANCE.text(), resultSymptom) : null;
+ Instant now = context.checkpoint();
Review comment:
it can be put inside the below `if`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services