This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new b5b458b  [GOBBLIN-1593] Fix bugs in dag manager about metric reporting 
and job status monitor (#3448)
b5b458b is described below

commit b5b458b1e39e3d5a0ac78406baecb5fb6ead25dd
Author: Zihan Li <[email protected]>
AuthorDate: Wed Jan 5 15:16:41 2022 -0800

    [GOBBLIN-1593] Fix bugs in dag manager about metric reporting and job 
status monitor (#3448)
    
    * [GOBBLIN-1593] Fix bugs in dag manager about metric reporting and job 
status monitor
    
    * address comments
    
    * fix typo
---
 .../org/apache/gobblin/cluster/GobblinHelixJobLauncher.java |  4 ++--
 .../java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java |  4 ++--
 .../apache/gobblin/service/StreamingKafkaSpecConsumer.java  | 13 +++++++++----
 .../service/modules/orchestration/DagManagerUtils.java      |  9 ++++++---
 4 files changed, 19 insertions(+), 11 deletions(-)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index b7315e7..911ba2c 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -610,9 +610,9 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
 
     if (jobProps.containsKey(ConfigurationKeys.JOB_CURRENT_ATTEMPTS)) {
       metadataTags.add(new 
Tag<>(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
-          jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_ATTEMPTS, "")));
+          jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_ATTEMPTS, "1")));
       metadataTags.add(new 
Tag<>(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
-          jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_GENERATION, "")));
+          jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_GENERATION, 
"1")));
       metadataTags.add(new 
Tag<>(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD,
           "false"));
     }
diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
index 7f99f93..43543d8 100644
--- 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
@@ -390,9 +390,9 @@ public class AzkabanJobLauncher extends AbstractJob 
implements ApplicationLaunch
 
     if (jobProps.containsKey(ConfigurationKeys.JOB_CURRENT_ATTEMPTS)) {
       metadataTags.add(new 
Tag<>(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
-          jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_ATTEMPTS, "")));
+          jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_ATTEMPTS, "1")));
       metadataTags.add(new 
Tag<>(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
-          jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_GENERATION, "")));
+          jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_GENERATION, 
"1")));
       metadataTags.add(new 
Tag<>(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD,
           "false"));
     }
diff --git 
a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
 
b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
index b072273..4dcac2e 100644
--- 
a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
+++ 
b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
@@ -72,6 +72,7 @@ public class StreamingKafkaSpecConsumer extends 
AbstractIdleService implements S
   private final MutableJobCatalog _jobCatalog;
   private final MetricContext _metricContext;
   private final Metrics _metrics;
+  private final int _jobSpecQueueSize;
 
   public StreamingKafkaSpecConsumer(Config config, MutableJobCatalog 
jobCatalog, Optional<Logger> log) {
     String topic = config.getString(SPEC_KAFKA_TOPICS_KEY);
@@ -86,8 +87,9 @@ public class StreamingKafkaSpecConsumer extends 
AbstractIdleService implements S
     }
 
     _jobCatalog = jobCatalog;
-    _jobSpecQueue = new LinkedBlockingQueue<>(ConfigUtils.getInt(config, 
"SPEC_STREAMING_BLOCKING_QUEUE_SIZE",
-        DEFAULT_SPEC_STREAMING_BLOCKING_QUEUE_SIZE));
+    _jobSpecQueueSize = ConfigUtils.getInt(config, 
"SPEC_STREAMING_BLOCKING_QUEUE_SIZE",
+        DEFAULT_SPEC_STREAMING_BLOCKING_QUEUE_SIZE);
+    _jobSpecQueue = new LinkedBlockingQueue<>(_jobSpecQueueSize);
     _metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(config), 
this.getClass());
     _metrics = new Metrics(this._metricContext);
   }
@@ -111,13 +113,16 @@ public class StreamingKafkaSpecConsumer extends 
AbstractIdleService implements S
 
     try {
       Pair<SpecExecutor.Verb, Spec> specPair = _jobSpecQueue.take();
-      _metrics.specConsumerJobSpecDeq.mark();
+      int numSpecFetched = 0;
       do {
+        _metrics.specConsumerJobSpecDeq.mark();
+        numSpecFetched ++;
         changesSpecs.add(specPair);
 
         // if there are more elements then pass them along in this call
         specPair = _jobSpecQueue.poll();
-      } while (specPair != null);
+        // comparing numSpecFetched to _jobSpecQueueSize to make sure the loop 
will not run infinitely even in peak time
+      } while (specPair != null && numSpecFetched < _jobSpecQueueSize);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
     }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 8e52784..b7aa761 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -22,6 +22,7 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -151,11 +152,13 @@ public class DagManagerUtils {
 
   public static JobSpec getJobSpec(DagNode<JobExecutionPlan> dagNode) {
     JobSpec jobSpec = dagNode.getValue().getJobSpec();
-    Map<String, Integer> configWithCurrentAttempts = 
ImmutableMap.of(ConfigurationKeys.JOB_CURRENT_ATTEMPTS, 
dagNode.getValue().getCurrentAttempts(),
-        ConfigurationKeys.JOB_CURRENT_GENERATION, 
dagNode.getValue().getCurrentGeneration());
+    Map<String, String> configWithCurrentAttempts = 
ImmutableMap.of(ConfigurationKeys.JOB_CURRENT_ATTEMPTS, 
String.valueOf(dagNode.getValue().getCurrentAttempts()),
+        ConfigurationKeys.JOB_CURRENT_GENERATION, 
String.valueOf(dagNode.getValue().getCurrentGeneration()));
+    Properties configAsProperties = new 
Properties(jobSpec.getConfigAsProperties());
+    configAsProperties.putAll(configWithCurrentAttempts);
     //Return new spec with new config to avoid change the reference to dagNode
     return new JobSpec(jobSpec.getUri(), jobSpec.getVersion(), 
jobSpec.getDescription(), 
ConfigFactory.parseMap(configWithCurrentAttempts).withFallback(jobSpec.getConfig()),
-        jobSpec.getConfigAsProperties(), jobSpec.getTemplateURI(), 
jobSpec.getJobTemplate(), jobSpec.getMetadata());
+        configAsProperties, jobSpec.getTemplateURI(), 
jobSpec.getJobTemplate(), jobSpec.getMetadata());
   }
 
   static Config getJobConfig(DagNode<JobExecutionPlan> dagNode) {

Reply via email to