This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 495bdafe5 [GOBBLIN-1934] Monitor High Level Consumer queue size (#3805)
495bdafe5 is described below
commit 495bdafe58fa7105e725af55e01d38a5afc3d4d8
Author: umustafi <[email protected]>
AuthorDate: Tue Oct 24 14:01:44 2023 -0700
[GOBBLIN-1934] Monitor High Level Consumer queue size (#3805)
* Emit metrics to monitor high level consumer queue size
* Empty commit to trigger tests
* Use BlockingQueue.size() func instead of atomic integer array
* Remove unused import & add DagActionChangeMonitor prefix to metric
* Refactor to avoid repeating code
* Make protected variables private where possible
* Fix white space
---------
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../gobblin/runtime/kafka/HighLevelConsumer.java | 24 ++++++++++++++++++++--
.../gobblin/runtime/metrics/RuntimeMetrics.java | 1 +
.../monitoring/DagActionStoreChangeMonitor.java | 9 ++++++--
3 files changed, 30 insertions(+), 4 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
index 7b494201e..1cba016bd 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
@@ -51,6 +51,7 @@ import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.kafka.client.GobblinConsumerRebalanceListener;
import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
import org.apache.gobblin.kafka.client.KafkaConsumerRecord;
+import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
@@ -101,6 +102,7 @@ public abstract class HighLevelConsumer<K,V> extends
AbstractIdleService {
private final ScheduledExecutorService consumerExecutor;
private final ExecutorService queueExecutor;
private final BlockingQueue[] queues;
+ private ContextAwareGauge[] queueSizeGauges;
private final AtomicInteger recordsProcessed;
private final Map<KafkaPartition, Long> partitionOffsetsToCommit;
private final boolean enableAutoCommit;
@@ -127,7 +129,7 @@ public abstract class HighLevelConsumer<K,V> extends
AbstractIdleService {
this.consumerExecutor =
Executors.newSingleThreadScheduledExecutor(ExecutorsUtils.newThreadFactory(Optional.of(log),
Optional.of("HighLevelConsumerThread")));
this.queueExecutor = Executors.newFixedThreadPool(this.numThreads,
ExecutorsUtils.newThreadFactory(Optional.of(log),
Optional.of("QueueProcessor-%d")));
this.queues = new LinkedBlockingQueue[numThreads];
- for(int i=0; i<queues.length; i++) {
+ for(int i = 0; i < queues.length; i++) {
this.queues[i] = new LinkedBlockingQueue();
}
this.recordsProcessed = new AtomicInteger(0);
@@ -196,7 +198,24 @@ public abstract class HighLevelConsumer<K,V> extends
AbstractIdleService {
* this method to instantiate their own metrics.
*/
protected void createMetrics() {
- this.messagesRead =
this.metricContext.counter(RuntimeMetrics.GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_MESSAGES_READ);
+ String prefix = getMetricsPrefix();
+ this.messagesRead = this.metricContext.counter(prefix +
+ RuntimeMetrics.GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_MESSAGES_READ);
+ this.queueSizeGauges = new ContextAwareGauge[numThreads];
+ for (int i = 0; i < numThreads; i++) {
+ // An 'effectively' final variable is needed inside the lambda
expression below
+ int finalI = i;
+ this.queueSizeGauges[i] = this.metricContext.newContextAwareGauge(prefix
+
+ RuntimeMetrics.GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_QUEUE_SIZE_PREFIX +
"-" + i,
+ () -> queues[finalI].size());
+ }
+ }
+
+ /**
+ * Used by child classes to distinguish prefixes from one another
+ */
+ protected String getMetricsPrefix() {
+ return "";
}
/**
@@ -237,6 +256,7 @@ public abstract class HighLevelConsumer<K,V> extends
AbstractIdleService {
private void consume() {
try {
Iterator<KafkaConsumerRecord> itr = gobblinKafkaConsumerClient.consume();
+ // TODO: we may be committing too early and only want to commit after
process messages
if(!enableAutoCommit) {
commitOffsets();
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
index 8fc1258ab..dc4e26e91 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
@@ -28,6 +28,7 @@ public class RuntimeMetrics {
// Metric names
public static final String GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_MESSAGES_READ =
"gobblin.kafka.highLevelConsumer.messagesRead";
+ public static final String
GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_QUEUE_SIZE_PREFIX =
"gobblin.kafka.highLevelConsumer.queueSize";
public static final String GOBBLIN_JOB_MONITOR_KAFKA_TOTAL_SPECS =
"gobblin.jobMonitor.kafka.totalSpecs";
public static final String GOBBLIN_JOB_MONITOR_KAFKA_NEW_SPECS =
"gobblin.jobMonitor.kafka.newSpecs";
public static final String GOBBLIN_JOB_MONITOR_KAFKA_UPDATED_SPECS =
"gobblin.jobMonitor.kafka.updatedSpecs";
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index e5a2d090d..6855ca669 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -34,7 +34,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.ContextAwareMeter;
-import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
@@ -217,7 +216,8 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
@Override
protected void createMetrics() {
- super.messagesRead =
this.getMetricContext().counter(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
RuntimeMetrics.GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_MESSAGES_READ);
+ super.createMetrics();
+ // Dag Action specific metrics
this.killsInvoked =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED);
this.resumesInvoked =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED);
this.flowsLaunched =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED);
@@ -228,4 +228,9 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
this.produceToConsumeDelayMillis =
this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS,
() -> produceToConsumeDelayValue);
this.getMetricContext().register(this.produceToConsumeDelayMillis);
}
+
+ @Override
+ protected String getMetricsPrefix() {
+ return RuntimeMetrics.DAG_ACTION_STORE_MONITOR_PREFIX + ".";
+ }
}