This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new e392c97de [GOBBLIN-1978] Initialize dag action store monitor metrics
(#3851)
e392c97de is described below
commit e392c97debfbcd3cd8e687517f5eb1bf904286d7
Author: umustafi <[email protected]>
AuthorDate: Fri Dec 22 11:03:27 2023 -0800
[GOBBLIN-1978] Initialize dag action store monitor metrics (#3851)
* Create dagAction handling metrics when monitor is initialized
* Add warning comment
* Initialize all metrics and context earlier
---------
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../gobblin/runtime/kafka/HighLevelConsumer.java | 6 +++---
.../monitoring/DagActionStoreChangeMonitor.java | 23 ++++++++++++++++++++++
2 files changed, 26 insertions(+), 3 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
index 1cba016bd..82a45ceaf 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
@@ -99,7 +99,7 @@ public abstract class HighLevelConsumer<K,V> extends
AbstractIdleService {
protected Counter messagesRead;
@Getter
private final GobblinKafkaConsumerClient gobblinKafkaConsumerClient;
- private final ScheduledExecutorService consumerExecutor;
+ protected final ScheduledExecutorService consumerExecutor;
private final ExecutorService queueExecutor;
private final BlockingQueue[] queues;
private ContextAwareGauge[] queueSizeGauges;
@@ -253,7 +253,7 @@ public abstract class HighLevelConsumer<K,V> extends
AbstractIdleService {
* Note: All records from a KafkaPartition are added to the same queue.
* A queue can contain records from multiple partitions if partitions >
numThreads(queues)
*/
- private void consume() {
+ protected void consume() {
try {
Iterator<KafkaConsumerRecord> itr = gobblinKafkaConsumerClient.consume();
// TODO: we may be committing too early and only want to commit after
process messages
@@ -275,7 +275,7 @@ public abstract class HighLevelConsumer<K,V> extends
AbstractIdleService {
* Assigns a queue to each thread of the {@link #queueExecutor}
* Note: Assumption here is that {@link #numThreads} is same a number of
queues
*/
- private void processQueues() {
+ protected void processQueues() {
for(BlockingQueue queue : queues) {
queueExecutor.execute(new QueueProcessor(queue));
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index 34ebaa722..647bb8089 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -110,6 +110,12 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
this.orchestrator = orchestrator;
this.dagActionStore = dagActionStore;
this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
+
+ /*
+ Metrics need to be created before initializeMonitor() below is called (or
more specifically handleDagAction() is
+ called on any dagAction)
+ */
+ buildMetricsContextAndMetrics();
}
@Override
@@ -136,6 +142,23 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
}
}
+ /*
+ Override this method to do the same sequence as the parent class, except
create metrics. Instead, we create metrics
+ earlier upon class initialization because they are used immediately as dag
actions are loaded and processed from
+ the DagActionStore.
+ */
+ @Override
+ protected void startUp() {
+ // Method that starts threads that processes queues
+ processQueues();
+ // Main thread that constantly polls messages from kafka
+ consumerExecutor.execute(() -> {
+ while (!shutdownRequested) {
+ consume();
+ }
+ });
+ }
+
@Override
/*
This class is multithreaded and this method will be called by multiple
threads, however any given message will be