This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new b5b458b [GOBBLIN-1593] Fix bugs in dag manager about metric reporting
and job status monitor (#3448)
b5b458b is described below
commit b5b458b1e39e3d5a0ac78406baecb5fb6ead25dd
Author: Zihan Li <[email protected]>
AuthorDate: Wed Jan 5 15:16:41 2022 -0800
[GOBBLIN-1593] Fix bugs in dag manager about metric reporting and job
status monitor (#3448)
* [GOBBLIN-1593] Fix bugs in dag manager about metric reporting and job
status monitor
* address comments
* fix typo
---
.../org/apache/gobblin/cluster/GobblinHelixJobLauncher.java | 4 ++--
.../java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java | 4 ++--
.../apache/gobblin/service/StreamingKafkaSpecConsumer.java | 13 +++++++++----
.../service/modules/orchestration/DagManagerUtils.java | 9 ++++++---
4 files changed, 19 insertions(+), 11 deletions(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index b7315e7..911ba2c 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -610,9 +610,9 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
if (jobProps.containsKey(ConfigurationKeys.JOB_CURRENT_ATTEMPTS)) {
metadataTags.add(new
Tag<>(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
- jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_ATTEMPTS, "")));
+ jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_ATTEMPTS, "1")));
metadataTags.add(new
Tag<>(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
- jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_GENERATION, "")));
+ jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_GENERATION,
"1")));
metadataTags.add(new
Tag<>(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD,
"false"));
}
diff --git
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
index 7f99f93..43543d8 100644
---
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
+++
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
@@ -390,9 +390,9 @@ public class AzkabanJobLauncher extends AbstractJob
implements ApplicationLaunch
if (jobProps.containsKey(ConfigurationKeys.JOB_CURRENT_ATTEMPTS)) {
metadataTags.add(new
Tag<>(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
- jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_ATTEMPTS, "")));
+ jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_ATTEMPTS, "1")));
metadataTags.add(new
Tag<>(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
- jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_GENERATION, "")));
+ jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_GENERATION,
"1")));
metadataTags.add(new
Tag<>(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD,
"false"));
}
diff --git
a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
index b072273..4dcac2e 100644
---
a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
+++
b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
@@ -72,6 +72,7 @@ public class StreamingKafkaSpecConsumer extends
AbstractIdleService implements S
private final MutableJobCatalog _jobCatalog;
private final MetricContext _metricContext;
private final Metrics _metrics;
+ private final int _jobSpecQueueSize;
public StreamingKafkaSpecConsumer(Config config, MutableJobCatalog
jobCatalog, Optional<Logger> log) {
String topic = config.getString(SPEC_KAFKA_TOPICS_KEY);
@@ -86,8 +87,9 @@ public class StreamingKafkaSpecConsumer extends
AbstractIdleService implements S
}
_jobCatalog = jobCatalog;
- _jobSpecQueue = new LinkedBlockingQueue<>(ConfigUtils.getInt(config,
"SPEC_STREAMING_BLOCKING_QUEUE_SIZE",
- DEFAULT_SPEC_STREAMING_BLOCKING_QUEUE_SIZE));
+ _jobSpecQueueSize = ConfigUtils.getInt(config,
"SPEC_STREAMING_BLOCKING_QUEUE_SIZE",
+ DEFAULT_SPEC_STREAMING_BLOCKING_QUEUE_SIZE);
+ _jobSpecQueue = new LinkedBlockingQueue<>(_jobSpecQueueSize);
_metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(config),
this.getClass());
_metrics = new Metrics(this._metricContext);
}
@@ -111,13 +113,16 @@ public class StreamingKafkaSpecConsumer extends
AbstractIdleService implements S
try {
Pair<SpecExecutor.Verb, Spec> specPair = _jobSpecQueue.take();
- _metrics.specConsumerJobSpecDeq.mark();
+ int numSpecFetched = 0;
do {
+ _metrics.specConsumerJobSpecDeq.mark();
+ numSpecFetched ++;
changesSpecs.add(specPair);
// if there are more elements then pass them along in this call
specPair = _jobSpecQueue.poll();
- } while (specPair != null);
+ // comparing numSpecFetched to _jobSpecQueueSize to make sure the loop
will not run infinitely even in peak time
+ } while (specPair != null && numSpecFetched < _jobSpecQueueSize);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 8e52784..b7aa761 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -22,6 +22,7 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -151,11 +152,13 @@ public class DagManagerUtils {
public static JobSpec getJobSpec(DagNode<JobExecutionPlan> dagNode) {
JobSpec jobSpec = dagNode.getValue().getJobSpec();
- Map<String, Integer> configWithCurrentAttempts =
ImmutableMap.of(ConfigurationKeys.JOB_CURRENT_ATTEMPTS,
dagNode.getValue().getCurrentAttempts(),
- ConfigurationKeys.JOB_CURRENT_GENERATION,
dagNode.getValue().getCurrentGeneration());
+ Map<String, String> configWithCurrentAttempts =
ImmutableMap.of(ConfigurationKeys.JOB_CURRENT_ATTEMPTS,
String.valueOf(dagNode.getValue().getCurrentAttempts()),
+ ConfigurationKeys.JOB_CURRENT_GENERATION,
String.valueOf(dagNode.getValue().getCurrentGeneration()));
+ Properties configAsProperties = new
Properties(jobSpec.getConfigAsProperties());
+ configAsProperties.putAll(configWithCurrentAttempts);
//Return new spec with new config to avoid change the reference to dagNode
return new JobSpec(jobSpec.getUri(), jobSpec.getVersion(),
jobSpec.getDescription(),
ConfigFactory.parseMap(configWithCurrentAttempts).withFallback(jobSpec.getConfig()),
- jobSpec.getConfigAsProperties(), jobSpec.getTemplateURI(),
jobSpec.getJobTemplate(), jobSpec.getMetadata());
+ configAsProperties, jobSpec.getTemplateURI(),
jobSpec.getJobTemplate(), jobSpec.getMetadata());
}
static Config getJobConfig(DagNode<JobExecutionPlan> dagNode) {