This is an automated email from the ASF dual-hosted git repository. huijun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push: new 130df3c fix healthmgr metrics (#2839) 130df3c is described below commit 130df3c17e26126f1e7003ed069f8fc6e64a3d3f Author: Huijun Wu <huij...@twitter.com> AuthorDate: Thu Apr 12 20:50:18 2018 -0700 fix healthmgr metrics (#2839) * fix healthmgr metrics * fix exception null * fix metrics flush * fill metricsmgr client * add debug print * add next timer * fix metrics * fix style * fix unit * remove unnecessary comment * fix typo * update style * update style 2 * update style3 * make systemconfigfile optional * staging * fix compile * replace singleton with guice singleton * update sensor with guice singleton * fixcompile * rename metricsPublisher --- heron/executor/src/python/heron_executor.py | 3 +- .../tests/python/heron_executor_unittest.py | 2 +- .../org/apache/heron/healthmgr/HealthManager.java | 49 +++++-- .../heron/healthmgr/HealthManagerMetrics.java | 153 ++++++++++++++++----- .../healthmgr/detectors/BackPressureDetector.java | 9 +- .../diagnosers/SlowInstanceDiagnoser.java | 14 ++ .../resolvers/RestartContainerResolver.java | 10 +- .../healthmgr/sensors/BackPressureSensor.java | 10 +- .../apache/heron/healthmgr/HealthManagerTest.java | 2 +- 9 files changed, 196 insertions(+), 56 deletions(-) diff --git a/heron/executor/src/python/heron_executor.py b/heron/executor/src/python/heron_executor.py index 7e64a33..cbacf4b 100755 --- a/heron/executor/src/python/heron_executor.py +++ b/heron/executor/src/python/heron_executor.py @@ -498,7 +498,8 @@ class HeronExecutor(object): "--cluster", self.cluster, "--role", self.role, "--environment", self.environment, - "--topology_name", self.topology_name] + "--topology_name", self.topology_name, + "--metricsmgr_port", self.metrics_manager_port] return healthmgr_cmd diff --git a/heron/executor/tests/python/heron_executor_unittest.py b/heron/executor/tests/python/heron_executor_unittest.py index 76caff1..05d7e3b 100644 --- a/heron/executor/tests/python/heron_executor_unittest.py +++ b/heron/executor/tests/python/heron_executor_unittest.py @@ -131,7 +131,7 @@ class HeronExecutorTest(unittest.TestCase): "-Xloggc:log-files/gc.healthmgr.log -Djava.net.preferIPv4Stack=true " \ "-cp scheduler_classpath:healthmgr_classpath " \ "org.apache.heron.healthmgr.HealthManager --cluster cluster --role role " \ - "--environment environ --topology_name topname" + "--environment environ --topology_name topname --metricsmgr_port metricsmgr_port" def get_expected_instance_command(component_name, instance_id, container_id): instance_name = "container_%d_%s_%d" % (container_id, component_name, instance_id) diff --git a/heron/healthmgr/src/java/org/apache/heron/healthmgr/HealthManager.java b/heron/healthmgr/src/java/org/apache/heron/healthmgr/HealthManager.java index 684d260..3e9c3fc 100644 --- a/heron/healthmgr/src/java/org/apache/heron/healthmgr/HealthManager.java +++ b/heron/healthmgr/src/java/org/apache/heron/healthmgr/HealthManager.java @@ -18,6 +18,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.nio.file.Paths; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.ScheduledFuture; @@ -44,6 +45,7 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.heron.classification.InterfaceStability.Evolving; import org.apache.heron.classification.InterfaceStability.Unstable; +import org.apache.heron.common.basics.SingletonRegistry; import org.apache.heron.common.config.SystemConfig; import org.apache.heron.common.utils.logging.LoggingHelper; import org.apache.heron.healthmgr.HealthPolicyConfigReader.PolicyConfigKey; @@ -194,6 +196,14 @@ public class HealthManager { setupLogging(cmd, config); + LOG.fine(Arrays.toString(cmd.getOptions())); + + // Add the SystemConfig into SingletonRegistry + SystemConfig systemConfig = SystemConfig.newBuilder(true) + .putAll(Context.systemFile(config), true) + .putAll(Context.overrideFile(config), true).build(); + SingletonRegistry.INSTANCE.registerSingleton(SystemConfig.HERON_SYSTEM_CONFIG, systemConfig); + LOG.info("Static Heron config loaded successfully "); LOG.fine(config.toString()); @@ -204,32 +214,29 @@ public class HealthManager { String metricsUrl = config.getStringValue(PolicyConfigKey.METRIC_SOURCE_URL.key()); metricsUrl = getOptionValue(cmd, CliArgs.METRIC_SOURCE_URL, metricsUrl); - AbstractModule module = buildMetricsProviderModule(metricsUrl, metricSourceClassName); + // metrics reporting thread + HealthManagerMetrics publishingMetrics = + new HealthManagerMetrics(Integer.valueOf(getOptionValue(cmd, CliArgs.METRICSMGR_PORT))); + + AbstractModule module + = buildBaseModule(metricsUrl, metricSourceClassName, publishingMetrics); HealthManager healthManager = new HealthManager(config, module); LOG.info("Initializing health manager"); healthManager.initialize(); - LOG.info("Starting Health Manager metirc posting thread"); - HealthManagerMetrics publishingMetricsRunnable = null; - if (hasOption(cmd, CliArgs.METRICSMGR_PORT)) { - publishingMetricsRunnable = new HealthManagerMetrics( - Integer.valueOf(getOptionValue(cmd, CliArgs.METRICSMGR_PORT))); - } - LOG.info("Starting Health Manager"); PoliciesExecutor policyExecutor = new PoliciesExecutor(healthManager.healthPolicies); ScheduledFuture<?> future = policyExecutor.start(); - if (publishingMetricsRunnable != null) { - new Thread(publishingMetricsRunnable).start(); - } + + LOG.info("Starting Health Manager metric posting thread"); + new Thread(publishingMetrics).start(); + try { future.get(); } finally { policyExecutor.destroy(); - if (publishingMetricsRunnable != null) { - publishingMetricsRunnable.close(); - } + publishingMetrics.close(); } } @@ -319,7 +326,8 @@ public class HealthManager { } @VisibleForTesting - static AbstractModule buildMetricsProviderModule(final String sourceUrl, final String type) { + static AbstractModule buildBaseModule(final String sourceUrl, final String type, + final HealthManagerMetrics publishingMetrics) { return new AbstractModule() { @Override protected void configure() { @@ -329,6 +337,8 @@ public class HealthManager { bind(String.class) .annotatedWith(Names.named(CONF_METRICS_SOURCE_TYPE)) .toInstance(type); + bind(HealthManagerMetrics.class) + .toInstance(publishingMetrics); } }; } @@ -501,6 +511,14 @@ public class HealthManager { .argName("process mode") .build(); + Option metricsMgrPort = Option.builder("m") + .desc("Port of local MetricsManager") + .longOpt(CliArgs.METRICSMGR_PORT.text) + .hasArgs() + .argName("metrics_manager port") + .required() + .build(); + Option verbose = Option.builder("v") .desc("Enable debug logs") .longOpt(CliArgs.VERBOSE.text) @@ -515,6 +533,7 @@ public class HealthManager { options.addOption(metricsSourceType); options.addOption(metricsSourceURL); options.addOption(mode); + options.addOption(metricsMgrPort); options.addOption(verbose); return options; diff --git a/heron/healthmgr/src/java/org/apache/heron/healthmgr/HealthManagerMetrics.java b/heron/healthmgr/src/java/org/apache/heron/healthmgr/HealthManagerMetrics.java index 478ed1f..1989fec45 100644 --- a/heron/healthmgr/src/java/org/apache/heron/healthmgr/HealthManagerMetrics.java +++ b/heron/healthmgr/src/java/org/apache/heron/healthmgr/HealthManagerMetrics.java @@ -15,13 +15,19 @@ package org.apache.heron.healthmgr; import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.time.Duration; +import java.util.Map.Entry; +import java.util.logging.Level; import java.util.logging.Logger; +import javax.inject.Inject; +import javax.inject.Singleton; + import com.google.protobuf.Message; import org.apache.heron.api.metric.MultiCountMetric; -import org.apache.heron.common.basics.Communicator; import org.apache.heron.common.basics.NIOLooper; import org.apache.heron.common.basics.SingletonRegistry; import org.apache.heron.common.config.SystemConfig; @@ -29,31 +35,40 @@ import org.apache.heron.common.network.HeronClient; import org.apache.heron.common.network.HeronSocketOptions; import org.apache.heron.common.network.StatusCode; import org.apache.heron.common.utils.metrics.JVMMetrics; +import org.apache.heron.proto.system.Common; import org.apache.heron.proto.system.Metrics; /** * HealthMgr's metrics to be collect */ - +@Singleton public class HealthManagerMetrics implements Runnable, AutoCloseable { + public static final String METRICS_THREAD = "HealthManagerMetrics"; private static final Logger LOG = Logger.getLogger(HealthManagerMetrics.class.getName()); private static final String METRICS_MGR_HOST = "127.0.0.1"; + private final String metricsPrefix = "__healthmgr/"; + private final String metricsSensor = metricsPrefix + "sensor/"; + private final String metricsDetector = metricsPrefix + "detector/"; + private final String metricsDiagnoser = metricsPrefix + "diagnoser/"; + private final String metricsResolver = metricsPrefix + "resolver/"; + private final String metricsName = metricsPrefix + "customized/"; private final JVMMetrics jvmMetrics; private final MultiCountMetric executeSensorCount; private final MultiCountMetric executeDetectorCount; private final MultiCountMetric executeDiagnoserCount; private final MultiCountMetric executeResolverCount; + private final MultiCountMetric executeCount; private NIOLooper looper; private HeronClient metricsMgrClient; - private Communicator<Metrics.MetricPublisherPublishMessage> outMetricsQueues; /** * constructor to expose healthmgr metrics to local metricsmgr * @param metricsMgrPort local MetricsMgr port * @throws IOException */ + @Inject public HealthManagerMetrics(int metricsMgrPort) throws IOException { jvmMetrics = new JVMMetrics(); @@ -61,6 +76,7 @@ public class HealthManagerMetrics implements Runnable, AutoCloseable { executeDetectorCount = new MultiCountMetric(); executeDiagnoserCount = new MultiCountMetric(); executeResolverCount = new MultiCountMetric(); + executeCount = new MultiCountMetric(); looper = new NIOLooper(); @@ -76,38 +92,58 @@ public class HealthManagerMetrics implements Runnable, AutoCloseable { systemConfig.getInstanceNetworkOptionsSocketReceivedBufferSize(), systemConfig.getInstanceNetworkOptionsMaximumPacketSize()); metricsMgrClient = - new MetricsMgrClient(looper, METRICS_MGR_HOST, metricsMgrPort, socketOptions); - - outMetricsQueues = new Communicator<Metrics.MetricPublisherPublishMessage>(null, looper); - outMetricsQueues.init(systemConfig.getInstanceInternalMetricsWriteQueueCapacity(), - systemConfig.getInstanceTuningExpectedMetricsWriteQueueSize(), - systemConfig.getInstanceTuningCurrentSampleWeight()); + new SimpleMetricsManagerClient(looper, METRICS_MGR_HOST, metricsMgrPort, socketOptions); int interval = (int) systemConfig.getHeronMetricsExportInterval().getSeconds(); + looper.registerTimerEvent(Duration.ofSeconds(interval), new Runnable() { @Override public void run() { - jvmMetrics.getJVMSampleRunnable().run(); - - // push to container 0 metricsMgr - if (!metricsMgrClient.isConnected()) { - return; - } - - LOG.info("Flushing all pending data in MetricsManagerClient"); - // Collect all tuples in queue - int size = outMetricsQueues.size(); - for (int i = 0; i < size; i++) { - Metrics.MetricPublisherPublishMessage m = outMetricsQueues.poll(); - metricsMgrClient.sendMessage(m); + sendMetrics(); + // next timer task + if (looper != null) { + looper.registerTimerEvent(Duration.ofSeconds(interval), new Runnable() { + @Override + public void run() { + sendMetrics(); + } + }); } } }); } - public Communicator<Metrics.MetricPublisherPublishMessage> getMetricsQueue() { - return outMetricsQueues; + private void sendMetrics() { + jvmMetrics.getJVMSampleRunnable().run(); + + if (!metricsMgrClient.isConnected()) { + return; + } + + LOG.info("Flushing sensor/detector/diagnoser/resolver metrics"); + Metrics.MetricPublisherPublishMessage.Builder builder = + Metrics.MetricPublisherPublishMessage.newBuilder(); + addMetrics(builder, executeSensorCount, metricsSensor); + addMetrics(builder, executeDetectorCount, metricsDetector); + addMetrics(builder, executeDiagnoserCount, metricsDiagnoser); + addMetrics(builder, executeResolverCount, metricsResolver); + addMetrics(builder, executeCount, metricsName); + Metrics.MetricPublisherPublishMessage msg = builder.build(); + LOG.fine(msg.toString()); + metricsMgrClient.sendMessage(msg); + } + + private void addMetrics(Metrics.MetricPublisherPublishMessage.Builder b, MultiCountMetric m, + String prefix) { + for (Entry<String, Long> e : m.getValueAndReset().entrySet()) { + b.addMetrics(Metrics.MetricDatum.newBuilder().setName(prefix + e.getKey()) + .setValue(e.getValue().toString())); + } + } + + public synchronized void executeIncr(String metricName) { + executeCount.scope(metricName).incr(); } public synchronized void executeSensorIncr(String sensor) { @@ -136,44 +172,91 @@ public class HealthManagerMetrics implements Runnable, AutoCloseable { public void close() throws Exception { looper.exitLoop(); metricsMgrClient.stop(); - outMetricsQueues.clear(); } - class MetricsMgrClient extends HeronClient { + class SimpleMetricsManagerClient extends HeronClient { + private SystemConfig systemConfig; + private String hostname; - MetricsMgrClient(NIOLooper s, String host, int port, HeronSocketOptions options) { + SimpleMetricsManagerClient(NIOLooper s, String host, int port, HeronSocketOptions options) { super(s, host, port, options); - // TODO Auto-generated constructor stub + systemConfig = + (SystemConfig) SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG); + try { + this.hostname = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + throw new RuntimeException("GetHostName failed"); + } } @Override public void onError() { - // TODO Auto-generated method stub + LOG.severe("Disconnected from Metrics Manager."); + // Dispatch to onConnect(...) + onConnect(StatusCode.CONNECT_ERROR); } @Override public void onConnect(StatusCode status) { - // TODO Auto-generated method stub + if (status != StatusCode.OK) { + LOG.log(Level.WARNING, + "Cannot connect to the local metrics mgr with status: {0}, Will Retry..", status); + Runnable r = new Runnable() { + public void run() { + start(); + } + }; + + getNIOLooper().registerTimerEvent(systemConfig.getInstanceReconnectMetricsmgrInterval(), r); + return; + } + LOG.info("Connected to Metrics Manager. Ready to send register request"); + sendRegisterRequest(); + } + + private void sendRegisterRequest() { + Metrics.MetricPublisher publisher = Metrics.MetricPublisher.newBuilder().setHostname(hostname) + .setPort(getSocketChannel().socket().getPort()).setComponentName("__healthmgr__") + .setInstanceId("healthmgr-0").setInstanceIndex(-1).build(); + Metrics.MetricPublisherRegisterRequest request = + Metrics.MetricPublisherRegisterRequest.newBuilder().setPublisher(publisher).build(); + + // The timeout would be the reconnect-interval-seconds + sendRequest(request, null, Metrics.MetricPublisherRegisterResponse.newBuilder(), + systemConfig.getInstanceReconnectMetricsmgrInterval()); } @Override public void onResponse(StatusCode status, Object ctx, Message response) { - // TODO Auto-generated method stub + if (status != StatusCode.OK) { + throw new RuntimeException("Response from Metrics Manager not ok"); + } + if (Metrics.MetricPublisherRegisterResponse.class.isInstance(response)) { + handleRegisterResponse((Metrics.MetricPublisherRegisterResponse) response); + } else { + throw new RuntimeException("Unknown kind of response received from Metrics Manager"); + } + } + + private void handleRegisterResponse(Metrics.MetricPublisherRegisterResponse response) { + if (response.getStatus().getStatus() != Common.StatusCode.OK) { + throw new RuntimeException("Metrics Manager returned a not ok response for register"); + } + LOG.info("We registered ourselves to the Metrics Manager"); } @Override public void onIncomingMessage(Message message) { - // TODO Auto-generated method stub - + throw new RuntimeException( + "SimpleMetricsManagerClient got an unknown message from Metrics Manager"); } @Override public void onClose() { - // TODO Auto-generated method stub - + LOG.info("SimpleMetricsManagerClient exits"); } } diff --git a/heron/healthmgr/src/java/org/apache/heron/healthmgr/detectors/BackPressureDetector.java b/heron/healthmgr/src/java/org/apache/heron/healthmgr/detectors/BackPressureDetector.java index 5fe0824..a0edb7c 100644 --- a/heron/healthmgr/src/java/org/apache/heron/healthmgr/detectors/BackPressureDetector.java +++ b/heron/healthmgr/src/java/org/apache/heron/healthmgr/detectors/BackPressureDetector.java @@ -28,6 +28,7 @@ import com.microsoft.dhalion.core.Measurement; import com.microsoft.dhalion.core.MeasurementsTable; import com.microsoft.dhalion.core.Symptom; +import org.apache.heron.healthmgr.HealthManagerMetrics; import org.apache.heron.healthmgr.HealthPolicyConfig; import static org.apache.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_COMP_BACK_PRESSURE; @@ -35,14 +36,18 @@ import static org.apache.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMP import static org.apache.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE; public class BackPressureDetector extends BaseDetector { + public static final String BACK_PRESSURE_DETECTOR = "BackPressureDetector"; static final String CONF_NOISE_FILTER = "BackPressureDetector.noiseFilterMillis"; private static final Logger LOG = Logger.getLogger(BackPressureDetector.class.getName()); private final int noiseFilterMillis; + private HealthManagerMetrics publishingMetrics; @Inject - BackPressureDetector(HealthPolicyConfig policyConfig) { + BackPressureDetector(HealthPolicyConfig policyConfig, + HealthManagerMetrics publishingMetrics) { noiseFilterMillis = (int) policyConfig.getConfig(CONF_NOISE_FILTER, 20); + this.publishingMetrics = publishingMetrics; } /** @@ -53,6 +58,8 @@ public class BackPressureDetector extends BaseDetector { */ @Override public Collection<Symptom> detect(Collection<Measurement> measurements) { + publishingMetrics.executeDetectorIncr(BACK_PRESSURE_DETECTOR); + Collection<Symptom> result = new ArrayList<>(); Instant now = context.checkpoint(); diff --git a/heron/healthmgr/src/java/org/apache/heron/healthmgr/diagnosers/SlowInstanceDiagnoser.java b/heron/healthmgr/src/java/org/apache/heron/healthmgr/diagnosers/SlowInstanceDiagnoser.java index 08a47fc..1b2fdf6 100644 --- a/heron/healthmgr/src/java/org/apache/heron/healthmgr/diagnosers/SlowInstanceDiagnoser.java +++ b/heron/healthmgr/src/java/org/apache/heron/healthmgr/diagnosers/SlowInstanceDiagnoser.java @@ -19,11 +19,15 @@ import java.util.ArrayList; import java.util.Collection; import java.util.logging.Logger; +import javax.inject.Inject; + import com.microsoft.dhalion.core.Diagnosis; import com.microsoft.dhalion.core.MeasurementsTable; import com.microsoft.dhalion.core.Symptom; import com.microsoft.dhalion.core.SymptomsTable; +import org.apache.heron.healthmgr.HealthManagerMetrics; + import static org.apache.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_COMP_BACK_PRESSURE; import static org.apache.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_PROCESSING_RATE_SKEW; import static org.apache.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_WAIT_Q_SIZE_SKEW; @@ -31,10 +35,20 @@ import static org.apache.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType. import static org.apache.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE; public class SlowInstanceDiagnoser extends BaseDiagnoser { + public static final String SLOW_INSTANCE_DIAGNOSER = "SlowInstanceDiagnoser"; private static final Logger LOG = Logger.getLogger(SlowInstanceDiagnoser.class.getName()); + private HealthManagerMetrics publishingMetrics; + + @Inject + public SlowInstanceDiagnoser(HealthManagerMetrics publishingMetrics) { + this.publishingMetrics = publishingMetrics; + } + @Override public Collection<Diagnosis> diagnose(Collection<Symptom> symptoms) { + publishingMetrics.executeDiagnoserIncr(SLOW_INSTANCE_DIAGNOSER); + Collection<Diagnosis> diagnoses = new ArrayList<>(); SymptomsTable symptomsTable = SymptomsTable.of(symptoms); diff --git a/heron/healthmgr/src/java/org/apache/heron/healthmgr/resolvers/RestartContainerResolver.java b/heron/healthmgr/src/java/org/apache/heron/healthmgr/resolvers/RestartContainerResolver.java index 759e803..bf9b3cb 100644 --- a/heron/healthmgr/src/java/org/apache/heron/healthmgr/resolvers/RestartContainerResolver.java +++ b/heron/healthmgr/src/java/org/apache/heron/healthmgr/resolvers/RestartContainerResolver.java @@ -30,6 +30,7 @@ import com.microsoft.dhalion.core.SymptomsTable; import com.microsoft.dhalion.events.EventManager; import com.microsoft.dhalion.policy.PoliciesExecutor.ExecutionContext; +import org.apache.heron.healthmgr.HealthManagerMetrics; import org.apache.heron.healthmgr.common.HealthManagerEvents.ContainerRestart; import org.apache.heron.proto.scheduler.Scheduler.RestartTopologyRequest; import org.apache.heron.scheduler.client.ISchedulerClient; @@ -38,20 +39,24 @@ import static org.apache.heron.healthmgr.HealthManager.CONF_TOPOLOGY_NAME; import static org.apache.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_INSTANCE_BACK_PRESSURE; public class RestartContainerResolver implements IResolver { + public static final String RESTART_CONTAINER_RESOLVER = "RestartContainerResolver"; private static final Logger LOG = Logger.getLogger(RestartContainerResolver.class.getName()); private final EventManager eventManager; private final String topologyName; private final ISchedulerClient schedulerClient; private ExecutionContext context; + private HealthManagerMetrics publishingMetrics; @Inject public RestartContainerResolver(@Named(CONF_TOPOLOGY_NAME) String topologyName, EventManager eventManager, - ISchedulerClient schedulerClient) { + ISchedulerClient schedulerClient, + HealthManagerMetrics publishingMetrics) { this.topologyName = topologyName; this.eventManager = eventManager; this.schedulerClient = schedulerClient; + this.publishingMetrics = publishingMetrics; } @Override @@ -61,6 +66,8 @@ public class RestartContainerResolver implements IResolver { @Override public Collection<Action> resolve(Collection<Diagnosis> diagnosis) { + publishingMetrics.executeResolver(RESTART_CONTAINER_RESOLVER); + List<Action> actions = new ArrayList<>(); // find all back pressure measurements reported in this execution cycle @@ -97,6 +104,7 @@ public class RestartContainerResolver implements IResolver { .setTopologyName(topologyName) .build()); LOG.info("Restarted container result: " + b); + publishingMetrics.executeIncr("RestartContainer"); }); LOG.info("Broadcasting container restart event"); diff --git a/heron/healthmgr/src/java/org/apache/heron/healthmgr/sensors/BackPressureSensor.java b/heron/healthmgr/src/java/org/apache/heron/healthmgr/sensors/BackPressureSensor.java index 8cbca99..7bc7ea6 100644 --- a/heron/healthmgr/src/java/org/apache/heron/healthmgr/sensors/BackPressureSensor.java +++ b/heron/healthmgr/src/java/org/apache/heron/healthmgr/sensors/BackPressureSensor.java @@ -26,6 +26,7 @@ import com.microsoft.dhalion.api.MetricsProvider; import com.microsoft.dhalion.core.Measurement; import com.microsoft.dhalion.core.MeasurementsTable; +import org.apache.heron.healthmgr.HealthManagerMetrics; import org.apache.heron.healthmgr.HealthPolicyConfig; import org.apache.heron.healthmgr.common.PackingPlanProvider; import org.apache.heron.healthmgr.common.TopologyProvider; @@ -33,19 +34,24 @@ import org.apache.heron.healthmgr.common.TopologyProvider; import static org.apache.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE; public class BackPressureSensor extends BaseSensor { + public static final String BACKPRESSURE_SENSOR = "BackPressureSensor"; + private final MetricsProvider metricsProvider; private final PackingPlanProvider packingPlanProvider; private final TopologyProvider topologyProvider; + private HealthManagerMetrics publishingMetrics; @Inject public BackPressureSensor(PackingPlanProvider packingPlanProvider, TopologyProvider topologyProvider, HealthPolicyConfig policyConfig, - MetricsProvider metricsProvider) { + MetricsProvider metricsProvider, + HealthManagerMetrics publishingMetrics) { super(policyConfig, METRIC_BACK_PRESSURE.text(), BackPressureSensor.class.getSimpleName()); this.packingPlanProvider = packingPlanProvider; this.topologyProvider = topologyProvider; this.metricsProvider = metricsProvider; + this.publishingMetrics = publishingMetrics; } /** @@ -55,6 +61,8 @@ public class BackPressureSensor extends BaseSensor { */ @Override public Collection<Measurement> fetch() { + publishingMetrics.executeSensorIncr(BACKPRESSURE_SENSOR); + Collection<Measurement> result = new ArrayList<>(); Instant now = context.checkpoint(); diff --git a/heron/healthmgr/tests/java/org/apache/heron/healthmgr/HealthManagerTest.java b/heron/healthmgr/tests/java/org/apache/heron/healthmgr/HealthManagerTest.java index 2fa1e7e..3711e6c 100644 --- a/heron/healthmgr/tests/java/org/apache/heron/healthmgr/HealthManagerTest.java +++ b/heron/healthmgr/tests/java/org/apache/heron/healthmgr/HealthManagerTest.java @@ -64,7 +64,7 @@ public class HealthManagerTest { when(adaptor.getSchedulerLocation(anyString())).thenReturn(schedulerLocation); AbstractModule baseModule = HealthManager - .buildMetricsProviderModule("127.0.0.1", TrackerMetricsProvider.class.getName()); + .buildBaseModule("127.0.0.1", TrackerMetricsProvider.class.getName()); HealthManager healthManager = new HealthManager(config, baseModule); -- To stop receiving notification emails like this one, please contact hui...@apache.org.