KAFKA-5903: Added Connect metrics to the worker and distributed herder (KIP-196)
Added metrics to the Connect worker and rebalancing metrics to the distributed herder. This is built on top of #3987, and I can rebase this PR once that is merged. Author: Randall Hauch <[email protected]> Reviewers: Ewen Cheslack-Postava <[email protected]> Closes #4011 from rhauch/kafka-5903 (cherry picked from commit a47bfbcae050659d32f777ed2f4b26dda5fbdbbd) Signed-off-by: Ewen Cheslack-Postava <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1d026269 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1d026269 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1d026269 Branch: refs/heads/1.0 Commit: 1d026269e1ab0af130b78f1efadaabbc4f5a8552 Parents: 5e2767a Author: Randall Hauch <[email protected]> Authored: Thu Oct 5 11:23:11 2017 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Thu Oct 5 11:23:21 2017 -0700 ---------------------------------------------------------------------- .../kafka/connect/runtime/ConnectMetrics.java | 61 +++++++--- .../connect/runtime/ConnectMetricsRegistry.java | 121 ++++++++++++++----- .../apache/kafka/connect/runtime/Worker.java | 111 +++++++++++++++++ .../kafka/connect/runtime/WorkerConnector.java | 37 ++++-- .../kafka/connect/runtime/WorkerTask.java | 23 ++-- .../runtime/distributed/DistributedHerder.java | 87 ++++++++++++- .../connect/runtime/MockConnectMetrics.java | 82 +++++++++++-- .../connect/runtime/WorkerConnectorTest.java | 44 ++++++- .../connect/runtime/WorkerSinkTaskTest.java | 54 ++++----- .../connect/runtime/WorkerSourceTaskTest.java | 32 ++--- .../kafka/connect/runtime/WorkerTaskTest.java | 4 +- .../kafka/connect/runtime/WorkerTest.java | 101 ++++++++++++++-- .../distributed/DistributedHerderTest.java | 92 +++++++++++++- 13 files changed, 704 insertions(+), 145 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/1d026269/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java index 974967a..3cd1eae 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java @@ -19,8 +19,8 @@ package org.apache.kafka.connect.runtime; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.metrics.Gauge; import org.apache.kafka.common.metrics.JmxReporter; -import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsReporter; @@ -68,9 +68,11 @@ public class ConnectMetrics { this.time = time; MetricConfig metricConfig = new MetricConfig().samples(config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG)) - .timeWindow(config.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS) - .recordLevel(Sensor.RecordingLevel.forName(config.getString(CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG))); - List<MetricsReporter> reporters = config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); + .timeWindow(config.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG), + TimeUnit.MILLISECONDS).recordLevel( + Sensor.RecordingLevel.forName(config.getString(CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG))); + List<MetricsReporter> reporters = config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, + MetricsReporter.class); reporters.add(new JmxReporter(JMX_PREFIX)); this.metrics = new Metrics(metricConfig, reporters, time); LOG.debug("Registering Connect metrics with JMX for worker '{}'", workerId); @@ -121,7 +123,8 @@ public class ConnectMetrics { if (group == null) { group = new MetricGroup(groupId); MetricGroup previous = groupsByName.putIfAbsent(groupId, group); - if (previous != null) group = previous; + if (previous != null) + group = previous; } return group; } @@ -204,7 +207,8 @@ public class ConnectMetrics { @Override public boolean equals(Object obj) { - if (obj == this) return true; + if (obj == this) + return true; if (obj instanceof MetricGroupId) { MetricGroupId that = (MetricGroupId) obj; return this.groupName.equals(that.groupName) && this.tags.equals(that.tags); @@ -290,19 +294,38 @@ public class ConnectMetrics { } /** - * Add to this group an indicator metric with a function that will be used to obtain the indicator state. + * Add to this group an indicator metric with a function that returns the current value. * * @param nameTemplate the name template for the metric; may not be null - * @param predicate the predicate function used to determine the indicator state; may not be null + * @param supplier the function used to determine the literal value of the metric; may not be null * @throws IllegalArgumentException if the name is not valid */ - public void addIndicatorMetric(MetricNameTemplate nameTemplate, final IndicatorPredicate predicate) { + public <T> void addValueMetric(MetricNameTemplate nameTemplate, final LiteralSupplier<T> supplier) { MetricName metricName = metricName(nameTemplate); if (metrics().metric(metricName) == null) { - metrics().addMetric(metricName, new Measurable() { + metrics().addMetric(metricName, new Gauge<T>() { @Override - public double measure(MetricConfig config, long now) { - return predicate.matches() ? 1.0d : 0.0d; + public T value(MetricConfig config, long now) { + return supplier.metricValue(now); + } + }); + } + } + + /** + * Add to this group an indicator metric that always returns the specified value. + * + * @param nameTemplate the name template for the metric; may not be null + * @param value the value; may not be null + * @throws IllegalArgumentException if the name is not valid + */ + public <T> void addImmutableValueMetric(MetricNameTemplate nameTemplate, final T value) { + MetricName metricName = metricName(nameTemplate); + if (metrics().metric(metricName) == null) { + metrics().addMetric(metricName, new Gauge<T>() { + @Override + public T value(MetricConfig config, long now) { + return value; } }); } @@ -369,7 +392,8 @@ public class ConnectMetrics { public synchronized Sensor sensor(String name, MetricConfig config, Sensor.RecordingLevel recordingLevel, Sensor... parents) { // We need to make sure that all sensor names are unique across all groups, so use the sensor prefix Sensor result = metrics.sensor(sensorPrefix + name, config, Long.MAX_VALUE, recordingLevel, parents); - if (result != null) sensorNames.add(result.name()); + if (result != null) + sensorNames.add(result.name()); return result; } @@ -390,16 +414,17 @@ public class ConnectMetrics { } /** - * A simple functional interface that determines whether an indicator metric is true. + * A simple functional interface that returns a literal value. */ - public interface IndicatorPredicate { + public interface LiteralSupplier<T> { /** - * Return whether the indicator metric is true. + * Return the literal value for the metric. * - * @return true if the indicator metric is satisfied, or false otherwise + * @param now the current time in milliseconds + * @return the literal metric value; may not be null */ - boolean matches(); + T metricValue(long now); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/1d026269/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java index ee513c9..d78576e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java @@ -32,16 +32,15 @@ public class ConnectMetricsRegistry { public static final String TASK_GROUP_NAME = "connector-task-metrics"; public static final String SOURCE_TASK_GROUP_NAME = "source-task-metrics"; public static final String SINK_TASK_GROUP_NAME = "sink-task-metrics"; + public static final String WORKER_GROUP_NAME = "connect-worker-metrics"; + public static final String WORKER_REBALANCE_GROUP_NAME = "connect-worker-rebalance-metrics"; private final List<MetricNameTemplate> allTemplates = new ArrayList<>(); - public final MetricNameTemplate connectorStatusRunning; - public final MetricNameTemplate connectorStatusPaused; - public final MetricNameTemplate connectorStatusFailed; - public final MetricNameTemplate taskStatusUnassigned; - public final MetricNameTemplate taskStatusRunning; - public final MetricNameTemplate taskStatusPaused; - public final MetricNameTemplate taskStatusFailed; - public final MetricNameTemplate taskStatusDestroyed; + public final MetricNameTemplate connectorStatus; + public final MetricNameTemplate connectorType; + public final MetricNameTemplate connectorClass; + public final MetricNameTemplate connectorVersion; + public final MetricNameTemplate taskStatus; public final MetricNameTemplate taskRunningRatio; public final MetricNameTemplate taskPauseRatio; public final MetricNameTemplate taskCommitTimeMax; @@ -75,6 +74,25 @@ public class ConnectMetricsRegistry { public final MetricNameTemplate sinkRecordActiveCount; public final MetricNameTemplate sinkRecordActiveCountMax; public final MetricNameTemplate sinkRecordActiveCountAvg; + public final MetricNameTemplate connectorCount; + public final MetricNameTemplate taskCount; + public final MetricNameTemplate connectorStartupAttemptsTotal; + public final MetricNameTemplate connectorStartupSuccessTotal; + public final MetricNameTemplate connectorStartupSuccessPercentage; + public final MetricNameTemplate connectorStartupFailureTotal; + public final MetricNameTemplate connectorStartupFailurePercentage; + public final MetricNameTemplate taskStartupAttemptsTotal; + public final MetricNameTemplate taskStartupSuccessTotal; + public final MetricNameTemplate taskStartupSuccessPercentage; + public final MetricNameTemplate taskStartupFailureTotal; + public final MetricNameTemplate taskStartupFailurePercentage; + public final MetricNameTemplate leaderName; + public final MetricNameTemplate epoch; + public final MetricNameTemplate rebalanceCompletedTotal; + public final MetricNameTemplate rebalanceMode; + public final MetricNameTemplate rebalanceTimeMax; + public final MetricNameTemplate rebalanceTimeAvg; + public final MetricNameTemplate rebalanceTimeSinceLast; public ConnectMetricsRegistry() { this(new LinkedHashSet<String>()); @@ -85,28 +103,25 @@ public class ConnectMetricsRegistry { Set<String> connectorTags = new LinkedHashSet<>(tags); connectorTags.add(CONNECTOR_TAG_NAME); - connectorStatusRunning = createTemplate("status-running", CONNECTOR_GROUP_NAME, - "Signals whether the connector is in the running state.", connectorTags); - connectorStatusPaused = createTemplate("status-paused", CONNECTOR_GROUP_NAME, - "Signals whether the connector is in the paused state.", connectorTags); - connectorStatusFailed = createTemplate("status-failed", CONNECTOR_GROUP_NAME, - "Signals whether the connector is in the failed state.", connectorTags); + connectorStatus = createTemplate("status", CONNECTOR_GROUP_NAME, + "The status of the connector. One of 'unassigned', 'running', 'paused', 'failed', or " + + "'destroyed'.", + connectorTags); + connectorType = createTemplate("connector-type", CONNECTOR_GROUP_NAME, "The type of the connector. One of 'source' or 'sink'.", + connectorTags); + connectorClass = createTemplate("connector-class", CONNECTOR_GROUP_NAME, "The name of the connector class.", connectorTags); + connectorVersion = createTemplate("connector-version", CONNECTOR_GROUP_NAME, + "The version of the connector class, as reported by the connector.", connectorTags); /***** Worker task level *****/ Set<String> workerTaskTags = new LinkedHashSet<>(tags); workerTaskTags.add(CONNECTOR_TAG_NAME); workerTaskTags.add(TASK_TAG_NAME); - taskStatusUnassigned = createTemplate("status-unassigned", TASK_GROUP_NAME, "Signals whether this task is in the unassigned state.", - workerTaskTags); - taskStatusRunning = createTemplate("status-running", TASK_GROUP_NAME, "Signals whether this task is in the running state.", - workerTaskTags); - taskStatusPaused = createTemplate("status-paused", TASK_GROUP_NAME, "Signals whether this task is in the paused state.", - workerTaskTags); - taskStatusFailed = createTemplate("status-failed", TASK_GROUP_NAME, "Signals whether this task is in the failed state.", - workerTaskTags); - taskStatusDestroyed = createTemplate("status-destroyed", TASK_GROUP_NAME, "Signals whether this task is in the destroyed state.", - workerTaskTags); + taskStatus = createTemplate("status", TASK_GROUP_NAME, + "The status of the connector task. One of 'unassigned', 'running', 'paused', 'failed', or " + + "'destroyed'.", + workerTaskTags); taskRunningRatio = createTemplate("running-ratio", TASK_GROUP_NAME, "The fraction of time this task has spent in the running state.", workerTaskTags); taskPauseRatio = createTemplate("pause-ratio", TASK_GROUP_NAME, "The fraction of time this task has spent in the pause state.", @@ -230,13 +245,55 @@ public class ConnectMetricsRegistry { "committed/flushed/acknowledged by the sink task.", sinkTaskTags); sinkRecordActiveCountMax = createTemplate("sink-record-active-count-max", SINK_TASK_GROUP_NAME, - "The maximum number of records that have been read from Kafka but not yet completely " + - "committed/flushed/acknowledged by the sink task.", + "The maximum number of records that have been read from Kafka but not yet completely " + + "committed/flushed/acknowledged by the sink task.", sinkTaskTags); sinkRecordActiveCountAvg = createTemplate("sink-record-active-count-avg", SINK_TASK_GROUP_NAME, - "The average number of records that have been read from Kafka but not yet completely " + - "committed/flushed/acknowledged by the sink task.", + "The average number of records that have been read from Kafka but not yet completely " + + "committed/flushed/acknowledged by the sink task.", sinkTaskTags); + + /***** Worker level *****/ + Set<String> workerTags = new LinkedHashSet<>(tags); + + connectorCount = createTemplate("connector-count", WORKER_GROUP_NAME, "The number of connectors run in this worker.", workerTags); + taskCount = createTemplate("task-count", WORKER_GROUP_NAME, "The number of tasks run in this worker.", workerTags); + connectorStartupAttemptsTotal = createTemplate("connector-startup-attempts-total", WORKER_GROUP_NAME, + "The total number of connector startups that this worker has attempted.", workerTags); + connectorStartupSuccessTotal = createTemplate("connector-startup-success-total", WORKER_GROUP_NAME, + "The total number of connector starts that succeeded.", workerTags); + connectorStartupSuccessPercentage = createTemplate("connector-startup-success-percentage", WORKER_GROUP_NAME, + "The average percentage of this worker's connectors starts that succeeded.", workerTags); + connectorStartupFailureTotal = createTemplate("connector-startup-failure-total", WORKER_GROUP_NAME, + "The total number of connector starts that failed.", workerTags); + connectorStartupFailurePercentage = createTemplate("connector-startup-failure-percentage", WORKER_GROUP_NAME, + "The average percentage of this worker's connectors starts that failed.", workerTags); + taskStartupAttemptsTotal = createTemplate("task-startup-attempts-total", WORKER_GROUP_NAME, + "The total number of task startups that this worker has attempted.", workerTags); + taskStartupSuccessTotal = createTemplate("task-startup-success-total", WORKER_GROUP_NAME, + "The total number of task starts that succeeded.", workerTags); + taskStartupSuccessPercentage = createTemplate("task-startup-success-percentage", WORKER_GROUP_NAME, + "The average percentage of this worker's tasks starts that succeeded.", workerTags); + taskStartupFailureTotal = createTemplate("task-startup-failure-total", WORKER_GROUP_NAME, + "The total number of task starts that failed.", workerTags); + taskStartupFailurePercentage = createTemplate("task-startup-failure-percentage", WORKER_GROUP_NAME, + "The average percentage of this worker's tasks starts that failed.", workerTags); + + /***** Worker rebalance level *****/ + Set<String> rebalanceTags = new LinkedHashSet<>(tags); + + leaderName = createTemplate("leader-name", WORKER_REBALANCE_GROUP_NAME, "The name of the group leader.", rebalanceTags); + epoch = createTemplate("epoch", WORKER_REBALANCE_GROUP_NAME, "The epoch or generation number of this worker.", rebalanceTags); + rebalanceCompletedTotal = createTemplate("completed-rebalances-total", WORKER_REBALANCE_GROUP_NAME, + "The total number of rebalances completed by this worker.", rebalanceTags); + rebalanceMode = createTemplate("rebalancing", WORKER_REBALANCE_GROUP_NAME, + "Whether this worker is currently rebalancing.", rebalanceTags); + rebalanceTimeMax = createTemplate("rebalance-max-time-ms", WORKER_REBALANCE_GROUP_NAME, + "The maximum time in milliseconds spent by this worker to rebalance.", rebalanceTags); + rebalanceTimeAvg = createTemplate("rebalance-avg-time-ms", WORKER_REBALANCE_GROUP_NAME, + "The average time in milliseconds spent by this worker to rebalance.", rebalanceTags); + rebalanceTimeSinceLast = createTemplate("time-since-last-rebalance-ms", WORKER_REBALANCE_GROUP_NAME, + "The time in milliseconds since this worker completed the most recent rebalance.", rebalanceTags); } private MetricNameTemplate createTemplate(String name, String group, String doc, Set<String> tags) { @@ -272,4 +329,12 @@ public class ConnectMetricsRegistry { public String sourceTaskGroupName() { return SOURCE_TASK_GROUP_NAME; } + + public String workerGroupName() { + return WORKER_GROUP_NAME; + } + + public String workerRebalanceGroupName() { + return WORKER_REBALANCE_GROUP_NAME; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1d026269/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 56bc341..c6e2e17 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -18,12 +18,18 @@ package org.apache.kafka.connect.runtime; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Frequencies; +import org.apache.kafka.common.metrics.stats.Total; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.ConnectorContext; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.runtime.ConnectMetrics.LiteralSupplier; +import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; @@ -68,6 +74,7 @@ public class Worker { private final String workerId; private final Plugins plugins; private final ConnectMetrics metrics; + private final WorkerMetricsGroup workerMetricsGroup; private final WorkerConfig config; private final Converter internalKeyConverter; private final Converter internalValueConverter; @@ -91,6 +98,8 @@ public class Worker { this.time = time; this.plugins = plugins; this.config = config; + this.workerMetricsGroup = new WorkerMetricsGroup(metrics); + // Internal converters are required properties, thus getClass won't return null. this.internalKeyConverter = plugins.newConverter( config.getClass(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG).getName(), @@ -164,6 +173,8 @@ public class Worker { metrics.stop(); log.info("Worker stopped"); + + workerMetricsGroup.close(); } /** @@ -204,6 +215,7 @@ public class Worker { // Can't be put in a finally block because it needs to be swapped before the call on // statusListener Plugins.compareAndSwapLoaders(savedLoader); + workerMetricsGroup.recordConnectorStartupFailure(); statusListener.onFailure(connName, t); return false; } @@ -213,6 +225,7 @@ public class Worker { throw new ConnectException("Connector with name " + connName + " already exists"); log.info("Finished creating connector {}", connName); + workerMetricsGroup.recordConnectorStartupSuccess(); return true; } @@ -396,6 +409,7 @@ public class Worker { // Can't be put in a finally block because it needs to be swapped before the call on // statusListener Plugins.compareAndSwapLoaders(savedLoader); + workerMetricsGroup.recordTaskFailure(); statusListener.onFailure(id, t); return false; } @@ -408,6 +422,7 @@ public class Worker { if (workerTask instanceof WorkerSourceTask) { sourceTaskOffsetCommitter.schedule(id, (WorkerSourceTask) workerTask); } + workerMetricsGroup.recordTaskSuccess(); return true; } @@ -583,4 +598,100 @@ public class Worker { Plugins.compareAndSwapLoaders(savedLoader); } } + + WorkerMetricsGroup workerMetricsGroup() { + return workerMetricsGroup; + } + + class WorkerMetricsGroup { + private final MetricGroup metricGroup; + private final Sensor connectorStartupAttempts; + private final Sensor connectorStartupSuccesses; + private final Sensor connectorStartupFailures; + private final Sensor connectorStartupResults; + private final Sensor taskStartupAttempts; + private final Sensor taskStartupSuccesses; + private final Sensor taskStartupFailures; + private final Sensor taskStartupResults; + + public WorkerMetricsGroup(ConnectMetrics connectMetrics) { + ConnectMetricsRegistry registry = connectMetrics.registry(); + metricGroup = connectMetrics.group(registry.workerGroupName()); + + metricGroup.addValueMetric(registry.connectorCount, new LiteralSupplier<Double>() { + @Override + public Double metricValue(long now) { + return (double) connectors.size(); + } + }); + metricGroup.addValueMetric(registry.taskCount, new LiteralSupplier<Double>() { + @Override + public Double metricValue(long now) { + return (double) tasks.size(); + } + }); + + MetricName connectorFailurePct = metricGroup.metricName(registry.connectorStartupFailurePercentage); + MetricName connectorSuccessPct = metricGroup.metricName(registry.connectorStartupSuccessPercentage); + Frequencies connectorStartupResultFrequencies = Frequencies.forBooleanValues(connectorFailurePct, connectorSuccessPct); + connectorStartupResults = metricGroup.sensor("connector-startup-results"); + connectorStartupResults.add(connectorStartupResultFrequencies); + + connectorStartupAttempts = metricGroup.sensor("connector-startup-attempts"); + connectorStartupAttempts.add(metricGroup.metricName(registry.connectorStartupAttemptsTotal), new Total()); + + connectorStartupSuccesses = metricGroup.sensor("connector-startup-successes"); + connectorStartupSuccesses.add(metricGroup.metricName(registry.connectorStartupSuccessTotal), new Total()); + + connectorStartupFailures = metricGroup.sensor("connector-startup-failures"); + connectorStartupFailures.add(metricGroup.metricName(registry.connectorStartupFailureTotal), new Total()); + + MetricName taskFailurePct = metricGroup.metricName(registry.taskStartupFailurePercentage); + MetricName taskSuccessPct = metricGroup.metricName(registry.taskStartupSuccessPercentage); + Frequencies taskStartupResultFrequencies = Frequencies.forBooleanValues(taskFailurePct, taskSuccessPct); + taskStartupResults = metricGroup.sensor("task-startup-results"); + taskStartupResults.add(taskStartupResultFrequencies); + + taskStartupAttempts = metricGroup.sensor("task-startup-attempts"); + taskStartupAttempts.add(metricGroup.metricName(registry.taskStartupAttemptsTotal), new Total()); + + taskStartupSuccesses = metricGroup.sensor("task-startup-successes"); + taskStartupSuccesses.add(metricGroup.metricName(registry.taskStartupSuccessTotal), new Total()); + + taskStartupFailures = metricGroup.sensor("task-startup-failures"); + taskStartupFailures.add(metricGroup.metricName(registry.taskStartupFailureTotal), new Total()); + } + + void close() { + metricGroup.close(); + } + + void recordConnectorStartupFailure() { + connectorStartupAttempts.record(1.0); + connectorStartupFailures.record(1.0); + connectorStartupResults.record(0.0); + } + + void recordConnectorStartupSuccess() { + connectorStartupAttempts.record(1.0); + connectorStartupSuccesses.record(1.0); + connectorStartupResults.record(1.0); + } + + void recordTaskFailure() { + taskStartupAttempts.record(1.0); + taskStartupFailures.record(1.0); + taskStartupResults.record(0.0); + } + + void recordTaskSuccess() { + taskStartupAttempts.record(1.0); + taskStartupSuccesses.record(1.0); + taskStartupResults.record(1.0); + } + + protected MetricGroup metricGroup() { + return metricGroup; + } + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1d026269/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java index 21104bd..9e65cd2 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java @@ -16,16 +16,18 @@ */ package org.apache.kafka.connect.runtime; -import org.apache.kafka.common.MetricNameTemplate; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.ConnectorContext; -import org.apache.kafka.connect.runtime.ConnectMetrics.IndicatorPredicate; +import org.apache.kafka.connect.runtime.ConnectMetrics.LiteralSupplier; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.source.SourceConnector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Locale; import java.util.Map; +import java.util.Objects; /** * Container for connectors which is responsible for managing their lifecycle (e.g. handling startup, @@ -199,6 +201,18 @@ public class WorkerConnector { return SinkConnector.class.isAssignableFrom(connector.getClass()); } + public boolean isSourceConnector() { + return SourceConnector.class.isAssignableFrom(connector.getClass()); + } + + protected String connectorType() { + if (isSinkConnector()) + return "sink"; + if (isSourceConnector()) + return "source"; + return "unknown"; + } + public Connector connector() { return connector; } @@ -224,22 +238,23 @@ public class WorkerConnector { private final ConnectorStatus.Listener delegate; public ConnectorMetricsGroup(ConnectMetrics connectMetrics, AbstractStatus.State initialState, ConnectorStatus.Listener delegate) { + Objects.requireNonNull(connectMetrics); + Objects.requireNonNull(connector); + Objects.requireNonNull(initialState); + Objects.requireNonNull(delegate); this.delegate = delegate; this.state = initialState; ConnectMetricsRegistry registry = connectMetrics.registry(); this.metricGroup = connectMetrics.group(registry.connectorGroupName(), registry.connectorTagName(), connName); - addStateMetric(AbstractStatus.State.RUNNING, registry.connectorStatusRunning); - addStateMetric(AbstractStatus.State.PAUSED, registry.connectorStatusPaused); - addStateMetric(AbstractStatus.State.FAILED, registry.connectorStatusFailed); - } - - private void addStateMetric(final AbstractStatus.State matchingState, MetricNameTemplate nameTemplate) { - metricGroup.addIndicatorMetric(nameTemplate, new IndicatorPredicate() { + metricGroup.addImmutableValueMetric(registry.connectorType, connectorType()); + metricGroup.addImmutableValueMetric(registry.connectorClass, connector.getClass().getName()); + metricGroup.addImmutableValueMetric(registry.connectorVersion, connector.version()); + metricGroup.addValueMetric(registry.connectorStatus, new LiteralSupplier<String>() { @Override - public boolean matches() { - return state == matchingState; + public String metricValue(long now) { + return state.toString().toLowerCase(Locale.getDefault()); } }); } http://git-wip-us.apache.org/repos/asf/kafka/blob/1d026269/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java index 6499ac2..ec06924 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java @@ -26,13 +26,14 @@ import org.apache.kafka.common.metrics.stats.Frequencies; import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.runtime.AbstractStatus.State; -import org.apache.kafka.connect.runtime.ConnectMetrics.IndicatorPredicate; +import org.apache.kafka.connect.runtime.ConnectMetrics.LiteralSupplier; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.util.ConnectorTaskId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Locale; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -313,11 +314,12 @@ abstract class WorkerTask implements Runnable { registry.connectorTagName(), id.connector(), registry.taskTagName(), Integer.toString(id.task())); - addTaskStateMetric(State.UNASSIGNED, registry.taskStatusUnassigned); - addTaskStateMetric(State.RUNNING, registry.taskStatusRunning); - addTaskStateMetric(State.PAUSED, registry.taskStatusPaused); - addTaskStateMetric(State.FAILED, registry.taskStatusDestroyed); - addTaskStateMetric(State.DESTROYED, registry.taskStatusDestroyed); + metricGroup.addValueMetric(registry.taskStatus, new LiteralSupplier<String>() { + @Override + public String metricValue(long now) { + return taskStateTimer.currentState().toString().toLowerCase(Locale.getDefault()); + } + }); addRatioMetric(State.RUNNING, registry.taskRunningRatio); addRatioMetric(State.PAUSED, registry.taskPauseRatio); @@ -337,15 +339,6 @@ abstract class WorkerTask implements Runnable { commitAttempts.add(commitFrequencies); } - private void addTaskStateMetric(final State matchingState, MetricNameTemplate template) { - metricGroup.addIndicatorMetric(template, new IndicatorPredicate() { - @Override - public boolean matches() { - return matchingState == taskStateTimer.currentState(); - } - }); - } - private void addRatioMetric(final State matchingState, MetricNameTemplate template) { MetricName metricName = metricGroup.metricName(template); if (metricGroup.metrics().metric(metricName) == null) { http://git-wip-us.apache.org/repos/asf/kafka/blob/1d026269/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 390f8c3..4d3d07b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -19,6 +19,10 @@ package org.apache.kafka.connect.runtime.distributed; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Total; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; @@ -28,6 +32,10 @@ import org.apache.kafka.connect.errors.AlreadyExistsException; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.NotFoundException; import org.apache.kafka.connect.runtime.AbstractHerder; +import org.apache.kafka.connect.runtime.ConnectMetrics; +import org.apache.kafka.connect.runtime.ConnectMetrics.LiteralSupplier; +import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; +import org.apache.kafka.connect.runtime.ConnectMetricsRegistry; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.HerderConnectorContext; import org.apache.kafka.connect.runtime.SinkConnectorConfig; @@ -106,6 +114,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { private final AtomicLong requestSeqNum = new AtomicLong(); private final Time time; + private final HerderMetrics herderMetrics; private final String workerGroupId; private final int workerSyncTimeoutMs; @@ -143,7 +152,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { StatusBackingStore statusBackingStore, ConfigBackingStore configBackingStore, String restUrl) { - this(config, worker, worker.workerId(), statusBackingStore, configBackingStore, null, restUrl, time); + this(config, worker, worker.workerId(), statusBackingStore, configBackingStore, null, restUrl, worker.metrics(), time); configBackingStore.setUpdateListener(new ConfigUpdateListener()); } @@ -155,10 +164,12 @@ public class DistributedHerder extends AbstractHerder implements Runnable { ConfigBackingStore configBackingStore, WorkerGroupMember member, String restUrl, + ConnectMetrics metrics, Time time) { super(worker, workerId, statusBackingStore, configBackingStore); this.time = time; + this.herderMetrics = new HerderMetrics(metrics); this.workerGroupId = config.getString(DistributedConfig.GROUP_ID_CONFIG); this.workerSyncTimeoutMs = config.getInt(DistributedConfig.WORKER_SYNC_TIMEOUT_MS_CONFIG); this.workerTasksShutdownTimeoutMs = config.getLong(DistributedConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG); @@ -202,6 +213,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { halt(); log.info("Herder stopped"); + herderMetrics.close(); } catch (Throwable t) { log.error("Uncaught exception in herder work thread, exiting: ", t); Exit.exit(1); @@ -781,6 +793,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { // We only mark this as resolved once we've actually started work, which allows us to correctly track whether // what work is currently active and running. If we bail early, the main tick loop + having requested rejoin // guarantees we'll attempt to rejoin before executing this method again. + herderMetrics.rebalanceSucceeded(time.milliseconds()); rebalanceResolved = true; return true; } @@ -1163,6 +1176,10 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } } + protected HerderMetrics herderMetrics() { + return herderMetrics; + } + // Rebalances are triggered internally from the group member, so these are always executed in the work thread. public class RebalanceListener implements WorkerRebalanceListener { @Override @@ -1177,6 +1194,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { DistributedHerder.this.assignment = assignment; DistributedHerder.this.generation = generation; rebalanceResolved = false; + herderMetrics.rebalanceStarted(time.milliseconds()); } // Delete the statuses of all connectors removed prior to the start of this rebalance. This has to @@ -1230,4 +1248,71 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } } + class HerderMetrics { + private final MetricGroup metricGroup; + private final Sensor rebalanceCompletedCounts; + private final Sensor rebalanceTime; + private volatile long lastRebalanceCompletedAtMillis = Long.MIN_VALUE; + private volatile boolean rebalancing = false; + private volatile long rebalanceStartedAtMillis = 0L; + + public HerderMetrics(ConnectMetrics connectMetrics) { + ConnectMetricsRegistry registry = connectMetrics.registry(); + metricGroup = connectMetrics.group(registry.workerRebalanceGroupName()); + + metricGroup.addValueMetric(registry.leaderName, new LiteralSupplier<String>() { + @Override + public String metricValue(long now) { + return leaderUrl(); + } + }); + metricGroup.addValueMetric(registry.epoch, new LiteralSupplier<Double>() { + @Override + public Double metricValue(long now) { + return (double) generation; + } + }); + metricGroup.addValueMetric(registry.rebalanceMode, new LiteralSupplier<Double>() { + @Override + public Double metricValue(long now) { + return rebalancing ? 1.0d : 0.0d; + } + }); + + rebalanceCompletedCounts = metricGroup.sensor("completed-rebalance-count"); + rebalanceCompletedCounts.add(metricGroup.metricName(registry.rebalanceCompletedTotal), new Total()); + + rebalanceTime = metricGroup.sensor("rebalance-time"); + rebalanceTime.add(metricGroup.metricName(registry.rebalanceTimeMax), new Max()); + rebalanceTime.add(metricGroup.metricName(registry.rebalanceTimeAvg), new Avg()); + + metricGroup.addValueMetric(registry.rebalanceTimeSinceLast, new LiteralSupplier<Double>() { + @Override + public Double metricValue(long now) { + return lastRebalanceCompletedAtMillis == Long.MIN_VALUE ? Double.POSITIVE_INFINITY : (double) (now - lastRebalanceCompletedAtMillis); + } + }); + } + + void close() { + metricGroup.close(); + } + + void rebalanceStarted(long now) { + rebalanceStartedAtMillis = now; + rebalancing = true; + } + + void rebalanceSucceeded(long now) { + long duration = Math.max(0L, now - rebalanceStartedAtMillis); + rebalancing = false; + rebalanceCompletedCounts.record(1.0); + rebalanceTime.record(duration); + lastRebalanceCompletedAtMillis = now; + } + + protected MetricGroup metricGroup() { + return metricGroup; + } + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1d026269/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java index 6cc6db7..f1df140 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java @@ -56,6 +56,10 @@ public class MockConnectMetrics extends ConnectMetrics { this(new MockTime()); } + public MockConnectMetrics(org.apache.kafka.common.utils.MockTime time) { + super("mock", new WorkerConfig(WorkerConfig.baseConfigDef(), DEFAULT_WORKER_CONFIG), time); + } + public MockConnectMetrics(MockTime time) { super("mock", new WorkerConfig(WorkerConfig.baseConfigDef(), DEFAULT_WORKER_CONFIG), time); } @@ -73,28 +77,81 @@ public class MockConnectMetrics extends ConnectMetrics { * @param name the name of the metric * @return the current value of the metric */ - public double currentMetricValue(MetricGroup metricGroup, String name) { + public Object currentMetricValue(MetricGroup metricGroup, String name) { + return currentMetricValue(this, metricGroup, name); + } + + /** + * Get the current value of the named metric, which may have already been removed from the + * {@link org.apache.kafka.common.metrics.Metrics} but will have been captured before it was removed. + * + * @param metricGroup the metric metricGroup that contained the metric + * @param name the name of the metric + * @return the current value of the metric + */ + public double currentMetricValueAsDouble(MetricGroup metricGroup, String name) { + Object value = currentMetricValue(metricGroup, name); + return value instanceof Double ? ((Double) value).doubleValue() : Double.NaN; + } + + /** + * Get the current value of the named metric, which may have already been removed from the + * {@link org.apache.kafka.common.metrics.Metrics} but will have been captured before it was removed. + * + * @param metricGroup the metric metricGroup that contained the metric + * @param name the name of the metric + * @return the current value of the metric + */ + public String currentMetricValueAsString(MetricGroup metricGroup, String name) { + Object value = currentMetricValue(metricGroup, name); + return value instanceof String ? (String) value : null; + } + + /** + * Get the current value of the named metric, which may have already been removed from the + * {@link org.apache.kafka.common.metrics.Metrics} but will have been captured before it was removed. + * + * @param metrics the {@link ConnectMetrics} instance + * @param metricGroup the metric metricGroup that contained the metric + * @param name the name of the metric + * @return the current value of the metric + */ + public static Object currentMetricValue(ConnectMetrics metrics, MetricGroup metricGroup, String name) { MetricName metricName = metricGroup.metricName(name); - for (MetricsReporter reporter : metrics().reporters()) { + for (MetricsReporter reporter : metrics.metrics().reporters()) { if (reporter instanceof MockMetricsReporter) { return ((MockMetricsReporter) reporter).currentMetricValue(metricName); } } - return Double.NEGATIVE_INFINITY; + return null; } /** - * Determine if the {@link KafkaMetric} with the specified name exists within the - * {@link org.apache.kafka.common.metrics.Metrics} instance. + * Get the current value of the named metric, which may have already been removed from the + * {@link org.apache.kafka.common.metrics.Metrics} but will have been captured before it was removed. * + * @param metrics the {@link ConnectMetrics} instance * @param metricGroup the metric metricGroup that contained the metric * @param name the name of the metric - * @return true if the metric is still register, or false if it has been removed + * @return the current value of the metric */ - public boolean metricExists(MetricGroup metricGroup, String name) { - MetricName metricName = metricGroup.metricName(name); - KafkaMetric metric = metricGroup.metrics().metric(metricName); - return metric != null; + public static double currentMetricValueAsDouble(ConnectMetrics metrics, MetricGroup metricGroup, String name) { + Object value = currentMetricValue(metrics, metricGroup, name); + return value instanceof Double ? ((Double) value).doubleValue() : Double.NaN; + } + + /** + * Get the current value of the named metric, which may have already been removed from the + * {@link org.apache.kafka.common.metrics.Metrics} but will have been captured before it was removed. + * + * @param metrics the {@link ConnectMetrics} instance + * @param metricGroup the metric metricGroup that contained the metric + * @param name the name of the metric + * @return the current value of the metric + */ + public static String currentMetricValueAsString(ConnectMetrics metrics, MetricGroup metricGroup, String name) { + Object value = currentMetricValue(metrics, metricGroup, name); + return value instanceof String ? (String) value : null; } public static class MockMetricsReporter implements MetricsReporter { @@ -136,10 +193,9 @@ public class MockConnectMetrics extends ConnectMetrics { * @param metricName the name of the metric that was registered most recently * @return the current value of the metric */ - @SuppressWarnings("deprecation") - public double currentMetricValue(MetricName metricName) { + public Object currentMetricValue(MetricName metricName) { KafkaMetric metric = metricsByName.get(metricName); - return metric != null ? metric.value() : Double.NaN; + return metric != null ? metric.metricValue() : null; } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1d026269/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java index 5f03f5a..10c413d 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.ConnectorContext; +import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.runtime.isolation.Plugins; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; @@ -32,12 +33,15 @@ import java.util.HashMap; import java.util.Map; import static org.easymock.EasyMock.expectLastCall; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @RunWith(EasyMockRunner.class) public class WorkerConnectorTest extends EasyMockSupport { + private static final String VERSION = "1.1"; public static final String CONNECTOR = "connector"; public static final Map<String, String> CONFIG = new HashMap<>(); static { @@ -45,7 +49,7 @@ public class WorkerConnectorTest extends EasyMockSupport { CONFIG.put(ConnectorConfig.NAME_CONFIG, CONNECTOR); } public ConnectorConfig connectorConfig; - public ConnectMetrics metrics; + public MockConnectMetrics metrics; @Mock Plugins plugins; @Mock Connector connector; @@ -67,6 +71,9 @@ public class WorkerConnectorTest extends EasyMockSupport { public void testInitializeFailure() { RuntimeException exception = new RuntimeException(); + connector.version(); + expectLastCall().andReturn(VERSION); + connector.initialize(EasyMock.notNull(ConnectorContext.class)); expectLastCall().andThrow(exception); @@ -92,6 +99,9 @@ public class WorkerConnectorTest extends EasyMockSupport { public void testFailureIsFinalState() { RuntimeException exception = new RuntimeException(); + connector.version(); + expectLastCall().andReturn(VERSION); + connector.initialize(EasyMock.notNull(ConnectorContext.class)); expectLastCall().andThrow(exception); @@ -119,6 +129,9 @@ public class WorkerConnectorTest extends EasyMockSupport { @Test public void testStartupAndShutdown() { + connector.version(); + expectLastCall().andReturn(VERSION); + connector.initialize(EasyMock.notNull(ConnectorContext.class)); expectLastCall(); @@ -150,6 +163,9 @@ public class WorkerConnectorTest extends EasyMockSupport { @Test public void testStartupAndPause() { + connector.version(); + expectLastCall().andReturn(VERSION); + connector.initialize(EasyMock.notNull(ConnectorContext.class)); expectLastCall(); @@ -186,6 +202,9 @@ public class WorkerConnectorTest extends EasyMockSupport { @Test public void testOnResume() { + connector.version(); + expectLastCall().andReturn(VERSION); + connector.initialize(EasyMock.notNull(ConnectorContext.class)); expectLastCall(); @@ -222,6 +241,9 @@ public class WorkerConnectorTest extends EasyMockSupport { @Test public void testStartupPaused() { + connector.version(); + expectLastCall().andReturn(VERSION); + connector.initialize(EasyMock.notNull(ConnectorContext.class)); expectLastCall(); @@ -251,6 +273,9 @@ public class WorkerConnectorTest extends EasyMockSupport { public void testStartupFailure() { RuntimeException exception = new RuntimeException(); + connector.version(); + expectLastCall().andReturn(VERSION); + connector.initialize(EasyMock.notNull(ConnectorContext.class)); expectLastCall(); @@ -281,6 +306,9 @@ public class WorkerConnectorTest extends EasyMockSupport { public void testShutdownFailure() { RuntimeException exception = new RuntimeException(); + connector.version(); + expectLastCall().andReturn(VERSION); + connector.initialize(EasyMock.notNull(ConnectorContext.class)); expectLastCall(); @@ -312,6 +340,9 @@ public class WorkerConnectorTest extends EasyMockSupport { @Test public void testTransitionStartedToStarted() { + connector.version(); + expectLastCall().andReturn(VERSION); + connector.initialize(EasyMock.notNull(ConnectorContext.class)); expectLastCall(); @@ -346,6 +377,9 @@ public class WorkerConnectorTest extends EasyMockSupport { @Test public void testTransitionPausedToPaused() { + connector.version(); + expectLastCall().andReturn(VERSION); + connector.initialize(EasyMock.notNull(ConnectorContext.class)); expectLastCall(); @@ -415,6 +449,14 @@ public class WorkerConnectorTest extends EasyMockSupport { assertFalse(workerConnector.metrics().isFailed()); assertFalse(workerConnector.metrics().isPaused()); assertFalse(workerConnector.metrics().isRunning()); + MetricGroup metricGroup = workerConnector.metrics().metricGroup(); + String status = metrics.currentMetricValueAsString(metricGroup, "status"); + String type = metrics.currentMetricValueAsString(metricGroup, "connector-type"); + String clazz = metrics.currentMetricValueAsString(metricGroup, "connector-class"); + String version = metrics.currentMetricValueAsString(metricGroup, "connector-version"); + assertEquals(type, "unknown"); + assertNotNull(clazz); + assertEquals(VERSION, version); } private static abstract class TestConnector extends Connector { http://git-wip-us.apache.org/repos/asf/kafka/blob/1d026269/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index 782d66b..50b091d 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -180,8 +180,7 @@ public class WorkerSinkTaskTest { time.sleep(10000L); assertSinkMetricValue("partition-count", 2); - assertTaskMetricValue("status-running", 0.0); - assertTaskMetricValue("status-paused", 1.0); + assertTaskMetricValue("status", "paused"); assertTaskMetricValue("running-ratio", 0.0); assertTaskMetricValue("pause-ratio", 1.0); assertTaskMetricValue("offset-commit-max-time-ms", Double.NEGATIVE_INFINITY); @@ -253,8 +252,7 @@ public class WorkerSinkTaskTest { assertSinkMetricValue("offset-commit-completion-total", 0.0); assertSinkMetricValue("offset-commit-skip-rate", 0.0); assertSinkMetricValue("offset-commit-skip-total", 0.0); - assertTaskMetricValue("status-running", 1.0); - assertTaskMetricValue("status-paused", 0.0); + assertTaskMetricValue("status", "running"); assertTaskMetricValue("running-ratio", 1.0); assertTaskMetricValue("pause-ratio", 0.0); assertTaskMetricValue("batch-size-max", 1.0); @@ -272,8 +270,7 @@ public class WorkerSinkTaskTest { assertSinkMetricValue("offset-commit-completion-total", 1.0); assertSinkMetricValue("offset-commit-skip-rate", 0.0); assertSinkMetricValue("offset-commit-skip-total", 0.0); - assertTaskMetricValue("status-running", 0.0); - assertTaskMetricValue("status-paused", 1.0); + assertTaskMetricValue("status", "paused"); assertTaskMetricValue("running-ratio", 0.25); assertTaskMetricValue("pause-ratio", 0.75); @@ -333,8 +330,7 @@ public class WorkerSinkTaskTest { assertSinkMetricValue("offset-commit-completion-total", 0.0); assertSinkMetricValue("offset-commit-skip-rate", 0.0); assertSinkMetricValue("offset-commit-skip-total", 0.0); - assertTaskMetricValue("status-running", 1.0); - assertTaskMetricValue("status-paused", 0.0); + assertTaskMetricValue("status", "running"); assertTaskMetricValue("running-ratio", 1.0); assertTaskMetricValue("pause-ratio", 0.0); assertTaskMetricValue("batch-size-max", 0.0); @@ -352,8 +348,7 @@ public class WorkerSinkTaskTest { assertSinkMetricValue("sink-record-active-count", 1.0); assertSinkMetricValue("sink-record-active-count-max", 1.0); assertSinkMetricValue("sink-record-active-count-avg", 0.5); - assertTaskMetricValue("status-running", 1.0); - assertTaskMetricValue("status-paused", 0.0); + assertTaskMetricValue("status", "running"); assertTaskMetricValue("running-ratio", 1.0); assertTaskMetricValue("batch-size-max", 1.0); assertTaskMetricValue("batch-size-avg", 0.5); @@ -492,8 +487,7 @@ public class WorkerSinkTaskTest { assertSinkMetricValue("offset-commit-seq-no", 1.0); assertSinkMetricValue("offset-commit-completion-total", 1.0); assertSinkMetricValue("offset-commit-skip-total", 0.0); - assertTaskMetricValue("status-running", 1.0); - assertTaskMetricValue("status-paused", 0.0); + assertTaskMetricValue("status", "running"); assertTaskMetricValue("running-ratio", 1.0); assertTaskMetricValue("pause-ratio", 0.0); assertTaskMetricValue("batch-size-max", 1.0); @@ -560,8 +554,7 @@ public class WorkerSinkTaskTest { assertSinkMetricValue("offset-commit-seq-no", 0.0); assertSinkMetricValue("offset-commit-completion-total", 0.0); assertSinkMetricValue("offset-commit-skip-total", 0.0); - assertTaskMetricValue("status-running", 1.0); - assertTaskMetricValue("status-paused", 0.0); + assertTaskMetricValue("status", "running"); assertTaskMetricValue("running-ratio", 1.0); assertTaskMetricValue("pause-ratio", 0.0); assertTaskMetricValue("batch-size-max", 1.0); @@ -588,8 +581,7 @@ public class WorkerSinkTaskTest { assertSinkMetricValue("offset-commit-seq-no", 1.0); assertSinkMetricValue("offset-commit-completion-total", 1.0); assertSinkMetricValue("offset-commit-skip-total", 0.0); - assertTaskMetricValue("status-running", 1.0); - assertTaskMetricValue("status-paused", 0.0); + assertTaskMetricValue("status", "running"); assertTaskMetricValue("running-ratio", 1.0); assertTaskMetricValue("pause-ratio", 0.0); assertTaskMetricValue("batch-size-max", 1.0); @@ -991,8 +983,7 @@ public class WorkerSinkTaskTest { assertSinkMetricValue("offset-commit-seq-no", 2.0); assertSinkMetricValue("offset-commit-completion-total", 1.0); assertSinkMetricValue("offset-commit-skip-total", 1.0); - assertTaskMetricValue("status-running", 1.0); - assertTaskMetricValue("status-paused", 0.0); + assertTaskMetricValue("status", "running"); assertTaskMetricValue("running-ratio", 1.0); assertTaskMetricValue("pause-ratio", 0.0); assertTaskMetricValue("batch-size-max", 2.0); @@ -1026,8 +1017,7 @@ public class WorkerSinkTaskTest { assertSinkMetricValue("offset-commit-seq-no", 3.0); assertSinkMetricValue("offset-commit-completion-total", 2.0); assertSinkMetricValue("offset-commit-skip-total", 1.0); - assertTaskMetricValue("status-running", 1.0); - assertTaskMetricValue("status-paused", 0.0); + assertTaskMetricValue("status", "running"); assertTaskMetricValue("running-ratio", 1.0); assertTaskMetricValue("pause-ratio", 0.0); assertTaskMetricValue("batch-size-max", 2.0); @@ -1089,7 +1079,7 @@ public class WorkerSinkTaskTest { assertFalse(sinkTaskContext.getValue().isCommitRequested()); // should have been cleared assertEquals(offsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets")); assertEquals(0, workerTask.commitFailures()); - assertEquals(1.0, metrics.currentMetricValue(workerTask.taskMetricsGroup().metricGroup(), "batch-size-max"), 0.0001); + assertEquals(1.0, metrics.currentMetricValueAsDouble(workerTask.taskMetricsGroup().metricGroup(), "batch-size-max"), 0.0001); PowerMock.verifyAll(); } @@ -1289,16 +1279,22 @@ public class WorkerSinkTaskTest { private void assertSinkMetricValue(String name, double expected) { MetricGroup sinkTaskGroup = workerTask.sinkTaskMetricsGroup().metricGroup(); - double measured = metrics.currentMetricValue(sinkTaskGroup, name); + double measured = metrics.currentMetricValueAsDouble(sinkTaskGroup, name); assertEquals(expected, measured, 0.001d); } private void assertTaskMetricValue(String name, double expected) { MetricGroup taskGroup = workerTask.taskMetricsGroup().metricGroup(); - double measured = metrics.currentMetricValue(taskGroup, name); + double measured = metrics.currentMetricValueAsDouble(taskGroup, name); assertEquals(expected, measured, 0.001d); } + private void assertTaskMetricValue(String name, String expected) { + MetricGroup taskGroup = workerTask.taskMetricsGroup().metricGroup(); + String measured = metrics.currentMetricValueAsString(taskGroup, name); + assertEquals(expected, measured); + } + private void printMetrics() { System.out.println(""); sinkMetricValue("sink-record-read-rate"); @@ -1334,14 +1330,14 @@ public class WorkerSinkTaskTest { private double sinkMetricValue(String metricName) { MetricGroup sinkTaskGroup = workerTask.sinkTaskMetricsGroup().metricGroup(); - double value = metrics.currentMetricValue(sinkTaskGroup, metricName); + double value = metrics.currentMetricValueAsDouble(sinkTaskGroup, metricName); System.out.println("** " + metricName + "=" + value); return value; } private double taskMetricValue(String metricName) { MetricGroup taskGroup = workerTask.taskMetricsGroup().metricGroup(); - double value = metrics.currentMetricValue(taskGroup, metricName); + double value = metrics.currentMetricValueAsDouble(taskGroup, metricName); System.out.println("** " + metricName + "=" + value); return value; } @@ -1350,10 +1346,10 @@ public class WorkerSinkTaskTest { private void assertMetrics(int minimumPollCountExpected) { MetricGroup sinkTaskGroup = workerTask.sinkTaskMetricsGroup().metricGroup(); MetricGroup taskGroup = workerTask.taskMetricsGroup().metricGroup(); - double readRate = metrics.currentMetricValue(sinkTaskGroup, "sink-record-read-rate"); - double readTotal = metrics.currentMetricValue(sinkTaskGroup, "sink-record-read-total"); - double sendRate = metrics.currentMetricValue(sinkTaskGroup, "sink-record-send-rate"); - double sendTotal = metrics.currentMetricValue(sinkTaskGroup, "sink-record-send-total"); + double readRate = metrics.currentMetricValueAsDouble(sinkTaskGroup, "sink-record-read-rate"); + double readTotal = metrics.currentMetricValueAsDouble(sinkTaskGroup, "sink-record-read-total"); + double sendRate = metrics.currentMetricValueAsDouble(sinkTaskGroup, "sink-record-send-rate"); + double sendTotal = metrics.currentMetricValueAsDouble(sinkTaskGroup, "sink-record-send-total"); } private abstract static class TestSinkTask extends SinkTask { http://git-wip-us.apache.org/repos/asf/kafka/blob/1d026269/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index 928ccb9..4f0d243 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -615,12 +615,12 @@ public class WorkerSourceTaskTest extends ThreadedTest { group.recordPoll(100, 1000 + i * 100); group.recordWrite(10); } - assertEquals(1900.0, metrics.currentMetricValue(group.metricGroup(), "poll-batch-max-time-ms"), 0.001d); - assertEquals(1450.0, metrics.currentMetricValue(group.metricGroup(), "poll-batch-avg-time-ms"), 0.001d); - assertEquals(33.333, metrics.currentMetricValue(group.metricGroup(), "source-record-poll-rate"), 0.001d); - assertEquals(1000, metrics.currentMetricValue(group.metricGroup(), "source-record-poll-total"), 0.001d); - assertEquals(3.3333, metrics.currentMetricValue(group.metricGroup(), "source-record-write-rate"), 0.001d); - assertEquals(100, metrics.currentMetricValue(group.metricGroup(), "source-record-write-total"), 0.001d); + assertEquals(1900.0, metrics.currentMetricValueAsDouble(group.metricGroup(), "poll-batch-max-time-ms"), 0.001d); + assertEquals(1450.0, metrics.currentMetricValueAsDouble(group.metricGroup(), "poll-batch-avg-time-ms"), 0.001d); + assertEquals(33.333, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-poll-rate"), 0.001d); + assertEquals(1000, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-poll-total"), 0.001d); + assertEquals(3.3333, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-write-rate"), 0.001d); + assertEquals(100, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-write-total"), 0.001d); } private CountDownLatch expectPolls(int minimum, final AtomicInteger count) throws InterruptedException { @@ -795,19 +795,19 @@ public class WorkerSourceTaskTest extends ThreadedTest { private void assertPollMetrics(int minimumPollCountExpected) { MetricGroup sourceTaskGroup = workerTask.sourceTaskMetricsGroup().metricGroup(); MetricGroup taskGroup = workerTask.taskMetricsGroup().metricGroup(); - double pollRate = metrics.currentMetricValue(sourceTaskGroup, "source-record-poll-rate"); - double pollTotal = metrics.currentMetricValue(sourceTaskGroup, "source-record-poll-total"); + double pollRate = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-poll-rate"); + double pollTotal = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-poll-total"); if (minimumPollCountExpected > 0) { - assertEquals(RECORDS.size(), metrics.currentMetricValue(taskGroup, "batch-size-max"), 0.000001d); - assertEquals(RECORDS.size(), metrics.currentMetricValue(taskGroup, "batch-size-avg"), 0.000001d); + assertEquals(RECORDS.size(), metrics.currentMetricValueAsDouble(taskGroup, "batch-size-max"), 0.000001d); + assertEquals(RECORDS.size(), metrics.currentMetricValueAsDouble(taskGroup, "batch-size-avg"), 0.000001d); assertTrue(pollRate > 0.0d); } else { assertTrue(pollRate == 0.0d); } assertTrue(pollTotal >= minimumPollCountExpected); - double writeRate = metrics.currentMetricValue(sourceTaskGroup, "source-record-write-rate"); - double writeTotal = metrics.currentMetricValue(sourceTaskGroup, "source-record-write-total"); + double writeRate = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-write-rate"); + double writeTotal = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-write-total"); if (minimumPollCountExpected > 0) { assertTrue(writeRate > 0.0d); } else { @@ -815,14 +815,14 @@ public class WorkerSourceTaskTest extends ThreadedTest { } assertTrue(writeTotal >= minimumPollCountExpected); - double pollBatchTimeMax = metrics.currentMetricValue(sourceTaskGroup, "poll-batch-max-time-ms"); - double pollBatchTimeAvg = metrics.currentMetricValue(sourceTaskGroup, "poll-batch-avg-time-ms"); + double pollBatchTimeMax = metrics.currentMetricValueAsDouble(sourceTaskGroup, "poll-batch-max-time-ms"); + double pollBatchTimeAvg = metrics.currentMetricValueAsDouble(sourceTaskGroup, "poll-batch-avg-time-ms"); if (minimumPollCountExpected > 0) { assertTrue(pollBatchTimeMax >= 0.0d); } assertTrue(pollBatchTimeAvg >= 0.0d); - double activeCount = metrics.currentMetricValue(sourceTaskGroup, "source-record-active-count"); - double activeCountMax = metrics.currentMetricValue(sourceTaskGroup, "source-record-active-count-max"); + double activeCount = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-active-count"); + double activeCountMax = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-active-count-max"); assertEquals(0, activeCount, 0.000001d); if (minimumPollCountExpected > 0) { assertEquals(RECORDS.size(), activeCountMax, 0.000001d); http://git-wip-us.apache.org/repos/asf/kafka/blob/1d026269/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java index 96746a5..516b71a 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java @@ -314,8 +314,8 @@ public class WorkerTaskTest { long totalTime = 27000L; double pauseTimeRatio = (double) (3000L + 5000L) / totalTime; double runningTimeRatio = (double) (2000L + 4000L + 6000L) / totalTime; - assertEquals(pauseTimeRatio, metrics.currentMetricValue(group.metricGroup(), "pause-ratio"), 0.000001d); - assertEquals(runningTimeRatio, metrics.currentMetricValue(group.metricGroup(), "running-ratio"), 0.000001d); + assertEquals(pauseTimeRatio, metrics.currentMetricValueAsDouble(group.metricGroup(), "pause-ratio"), 0.000001d); + assertEquals(runningTimeRatio, metrics.currentMetricValueAsDouble(group.metricGroup(), "running-ratio"), 0.000001d); } private static abstract class TestSinkTask extends SinkTask {
