huijunwu commented on a change in pull request #2821: Update Dhalion dependency 
version
URL: https://github.com/apache/incubator-heron/pull/2821#discussion_r178399304
 
 

 ##########
 File path: 
heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/SlowInstanceDiagnoser.java
 ##########
 @@ -14,69 +14,74 @@
 
 package com.twitter.heron.healthmgr.diagnosers;
 
-import java.util.List;
-import java.util.Map;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.logging.Logger;
 
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.diagnoser.Diagnosis;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
+import com.microsoft.dhalion.core.Diagnosis;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.core.Symptom;
+import com.microsoft.dhalion.core.SymptomsTable;
 
-import com.twitter.heron.healthmgr.common.ComponentMetricsHelper;
-import com.twitter.heron.healthmgr.common.MetricsStats;
-
-import static 
com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.DIAGNOSIS_SLOW_INSTANCE;
-import static 
com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.SYMPTOM_SLOW_INSTANCE;
-import static 
com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
-import static 
com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BUFFER_SIZE;
+import static 
com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_COMP_BACK_PRESSURE;
+import static 
com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_PROCESSING_RATE_SKEW;
+import static 
com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_WAIT_Q_SIZE_SKEW;
+import static 
com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType.DIAGNOSIS_SLOW_INSTANCE;
+import static 
com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
 
 public class SlowInstanceDiagnoser extends BaseDiagnoser {
   private static final Logger LOG = 
Logger.getLogger(SlowInstanceDiagnoser.class.getName());
 
   @Override
-  public Diagnosis diagnose(List<Symptom> symptoms) {
-    List<Symptom> bpSymptoms = getBackPressureSymptoms(symptoms);
-    Map<String, ComponentMetrics> processingRateSkewComponents =
-        getProcessingRateSkewComponents(symptoms);
-    Map<String, ComponentMetrics> waitQDisparityComponents = 
getWaitQDisparityComponents(symptoms);
+  public Collection<Diagnosis> diagnose(Collection<Symptom> symptoms) {
+    Collection<Diagnosis> diagnoses = new ArrayList<>();
+    SymptomsTable symptomsTable = SymptomsTable.of(symptoms);
 
-    if (bpSymptoms.isEmpty() || waitQDisparityComponents.isEmpty()
-        || !processingRateSkewComponents.isEmpty()) {
-      // Since there is no back pressure or disparate wait count or similar
-      // execution count, no action is needed
-      return null;
-    } else if (bpSymptoms.size() > 1) {
+    SymptomsTable bp = symptomsTable.type(SYMPTOM_COMP_BACK_PRESSURE.text());
+    if (bp.size() > 1) {
       // TODO handle cases where multiple detectors create back pressure 
symptom
       throw new IllegalStateException("Multiple back-pressure symptoms case");
     }
-    ComponentMetrics bpMetrics = bpSymptoms.iterator().next().getComponent();
+    if (bp.size() == 0) {
+      return diagnoses;
+    }
+    String bpComponent = bp.first().assignments().iterator().next();
 
-    // verify wait Q disparity and back pressure for the same component exists
-    ComponentMetrics pendingBufferMetrics = 
waitQDisparityComponents.get(bpMetrics.getName());
-    if (pendingBufferMetrics == null) {
-      // no wait Q disparity for the component with back pressure. There is no 
slow instance
-      return null;
+    SymptomsTable processingRateSkew = 
symptomsTable.type(SYMPTOM_PROCESSING_RATE_SKEW.text());
+    SymptomsTable waitQSkew = 
symptomsTable.type(SYMPTOM_WAIT_Q_SIZE_SKEW.text());
+    // verify wait Q disparity, similar processing rates and back pressure for 
the same component
+    // exist
+    if (waitQSkew.assignment(bpComponent).size() == 0
+        || processingRateSkew.assignment(bpComponent).size() > 0) {
+      // TODO in a short window rate skew could exist
+      return diagnoses;
     }
 
-    ComponentMetrics mergedData = ComponentMetrics.merge(bpMetrics, 
pendingBufferMetrics);
-    ComponentMetricsHelper compStats = new ComponentMetricsHelper(mergedData);
-    compStats.computeBpStats();
-    MetricsStats bufferStats = 
compStats.computeMinMaxStats(METRIC_BUFFER_SIZE);
+    Collection<String> assignments = new ArrayList<>();
+
+    Instant newest = context.checkpoint();
+    Instant oldest = context.previousCheckpoint();
+    MeasurementsTable measurements = context.measurements()
+        .between(oldest, newest)
+        .component(bpComponent);
 
-    Symptom resultSymptom = null;
-    for (InstanceMetrics boltMetrics : compStats.getBoltsWithBackpressure()) {
-      double bufferSize = 
boltMetrics.getMetricValueSum(METRIC_BUFFER_SIZE.text());
-      double bpValue = 
boltMetrics.getMetricValueSum(METRIC_BACK_PRESSURE.text());
-      if (bufferStats.getMetricMax() < bufferSize * 2) {
-        LOG.info(String.format("SLOW: %s back-pressure(%s) and high buffer 
size: %s "
+    for (String instance : measurements.uniqueInstances()) {
+      MeasurementsTable instanceMeasurements = measurements.instance(instance);
+      double waitQSize = 
instanceMeasurements.type(METRIC_WAIT_Q_SIZE.text()).mean();
+      if (measurements.type(METRIC_WAIT_Q_SIZE.text()).max() < waitQSize * 2) {
+        assignments.add(instance);
+        LOG.info(String.format("SLOW: %s back-pressure and high buffer size: 
%s "
                 + "and similar processing rates",
-            boltMetrics.getName(), bpValue, bufferSize));
-        resultSymptom = new Symptom(SYMPTOM_SLOW_INSTANCE.text(), mergedData);
+            instance, waitQSize));
       }
     }
 
-    return resultSymptom != null
-        ? new Diagnosis(DIAGNOSIS_SLOW_INSTANCE.text(), resultSymptom) : null;
+    Instant now = context.checkpoint();
 
 Review comment:
   it can be put inside the below `if`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to