Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 29c76c9e7 -> 58f00f019


[GOBBLIN-635] Add metadata tags for GaaS/Azkaban jobs.

GOBBLIN-635:Add metadata tags to Gobblin Tracking
Event for Azkaban jobs triggered using Gobblin-as-
a-Service (GaaS).

Closes #2505 from sv2000/azkabanMetadataTags


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/58f00f01
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/58f00f01
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/58f00f01

Branch: refs/heads/master
Commit: 58f00f019cfc7334c3fe088deba8ad67a24b5e3a
Parents: 29c76c9
Author: suvasude <[email protected]>
Authored: Mon Nov 26 10:06:20 2018 -0800
Committer: Hung Tran <[email protected]>
Committed: Mon Nov 26 10:06:20 2018 -0800

----------------------------------------------------------------------
 .../gobblin/metrics/event/TimingEvent.java      |  1 +
 .../metrics/reporter/ScheduledReporter.java     |  4 +-
 .../gobblin/azkaban/AzkabanJobLauncher.java     | 65 +++++++++++++++++---
 .../metrics/kafka/KafkaProducerPusher.java      | 12 +++-
 .../metrics/kafka/KafkaProducerPusher.java      | 12 +++-
 .../gobblin/runtime/JobLauncherFactory.java     | 44 ++++++++++++-
 .../gobblin/runtime/local/LocalJobLauncher.java |  8 ++-
 .../runtime/mapreduce/MRJobLauncher.java        | 13 +++-
 8 files changed, 137 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/58f00f01/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
----------------------------------------------------------------------
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index 71dd947..c3e6259 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -77,6 +77,7 @@ public class TimingEvent {
   public static final String METADATA_END_TIME = "endTime";
   public static final String METADATA_DURATION = "durationMillis";
   public static final String METADATA_TIMING_EVENT = "timingEvent";
+  public static final String METADATA_MESSAGE = "message";
 
   private final String name;
   private final Long startTime;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/58f00f01/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/ScheduledReporter.java
----------------------------------------------------------------------
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/ScheduledReporter.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/ScheduledReporter.java
index 154dd3a..d15f15a 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/ScheduledReporter.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/ScheduledReporter.java
@@ -147,7 +147,9 @@ public abstract class ScheduledReporter extends 
ContextAwareReporter {
 
   @Override
   public void stopImpl() {
-    this.scheduledTask.get().cancel(false);
+    if (this.scheduledTask.isPresent()) {
+      this.scheduledTask.get().cancel(false);
+    }
     this.scheduledTask = Optional.absent();
     ExecutorsUtils.shutdownExecutorService(this.executor, Optional.of(log), 
10, TimeUnit.SECONDS);
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/58f00f01/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
index 3a28b18..05c1bb9 100644
--- 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
@@ -17,7 +17,6 @@
 
 package org.apache.gobblin.azkaban;
 
-import com.google.common.base.Optional;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
@@ -33,10 +32,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import 
org.apache.gobblin.runtime.job_catalog.PackagedTemplatesJobCatalogDecorator;
-import org.apache.gobblin.runtime.listeners.CompositeJobListener;
-import org.apache.gobblin.util.ClassAliasResolver;
-import org.apache.gobblin.util.ConfigUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.log4j.Level;
@@ -44,6 +39,7 @@ import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
 import com.google.common.base.Strings;
@@ -53,12 +49,16 @@ import com.google.common.io.Closer;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigValue;
 
+import azkaban.jobExecutor.AbstractJob;
+import javax.annotation.Nullable;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.DynamicConfigGenerator;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.metrics.GobblinMetrics;
 import org.apache.gobblin.metrics.RootMetricContext;
 import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.DynamicConfigGeneratorFactory;
 import org.apache.gobblin.runtime.JobException;
 import org.apache.gobblin.runtime.JobLauncher;
@@ -66,15 +66,16 @@ import org.apache.gobblin.runtime.JobLauncherFactory;
 import org.apache.gobblin.runtime.app.ApplicationException;
 import org.apache.gobblin.runtime.app.ApplicationLauncher;
 import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
+import 
org.apache.gobblin.runtime.job_catalog.PackagedTemplatesJobCatalogDecorator;
+import org.apache.gobblin.runtime.listeners.CompositeJobListener;
 import org.apache.gobblin.runtime.listeners.EmailNotificationJobListener;
 import org.apache.gobblin.runtime.listeners.JobListener;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.HadoopUtils;
 import org.apache.gobblin.util.TimeRangeChecker;
 import org.apache.gobblin.util.hadoop.TokenUtils;
 
-import azkaban.jobExecutor.AbstractJob;
-import javax.annotation.Nullable;
-
 import static 
org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
 
 
@@ -105,6 +106,7 @@ public class AzkabanJobLauncher extends AbstractJob 
implements ApplicationLaunch
 
   private static final String HADOOP_FS_DEFAULT_NAME = "fs.default.name";
   private static final String AZKABAN_LINK_JOBEXEC_URL = 
"azkaban.link.jobexec.url";
+  private static final String AZKABAN_FLOW_EXEC_ID = "azkaban.flow.execid";
   private static final String MAPREDUCE_JOB_CREDENTIALS_BINARY = 
"mapreduce.job.credentials.binary";
 
   private static final String AZKABAN_GOBBLIN_JOB_SLA_IN_SECONDS = 
"gobblin.azkaban.SLAInSeconds";
@@ -126,7 +128,7 @@ public class AzkabanJobLauncher extends AbstractJob 
implements ApplicationLaunch
 
   public AzkabanJobLauncher(String jobId, Properties props)
       throws Exception {
-    super(jobId, LOG);
+      super(jobId, LOG);
 
     HadoopUtils.addGobblinSite();
 
@@ -215,10 +217,17 @@ public class AzkabanJobLauncher extends AbstractJob 
implements ApplicationLaunch
     this.ownAzkabanSla = Long.parseLong(
         jobProps.getProperty(AZKABAN_GOBBLIN_JOB_SLA_IN_SECONDS, 
DEFAULT_AZKABAN_GOBBLIN_JOB_SLA_IN_SECONDS));
 
+    List<? extends Tag<?>> metadataTags = Lists.newArrayList();
+    //Is the job triggered using Gobblin-as-a-Service? If so, add additional 
tags needed for tracking
+    //the job execution.
+    if (jobProps.containsKey(ConfigurationKeys.FLOW_NAME_KEY)) {
+      metadataTags = addAdditionalMetadataTags(jobProps);
+    }
+
     // Create a JobLauncher instance depending on the configuration. The same 
properties object is
     // used for both system and job configuration properties because Azkaban 
puts configuration
     // properties in the .job file and in the .properties file into the same 
Properties object.
-    this.jobLauncher = 
this.closer.register(JobLauncherFactory.newJobLauncher(jobProps, jobProps));
+    this.jobLauncher = 
this.closer.register(JobLauncherFactory.newJobLauncher(jobProps, jobProps, 
null, metadataTags));
 
     // Since Java classes cannot extend multiple classes and Azkaban jobs must 
extend AbstractJob, we must use composition
     // verses extending ServiceBasedAppLauncher
@@ -361,4 +370,40 @@ public class AzkabanJobLauncher extends AbstractJob 
implements ApplicationLaunch
 
     return true;
   }
+
+
+  /**
+   * Add additional properties such as flow.group, flow.name, executionUrl. 
Useful for tracking
+   * job executions on Azkaban triggered by Gobblin-as-a-Service (GaaS).
+   * @param jobProps job properties
+   * @return a list of tags uniquely identifying a job execution on Azkaban.
+   */
+  private static List<? extends Tag<?>> addAdditionalMetadataTags(Properties 
jobProps) {
+    List<Tag<?>> metadataTags = Lists.newArrayList();
+    String jobExecutionId = jobProps.getProperty(AZKABAN_FLOW_EXEC_ID, "");
+    String jobExecutionUrl = jobProps.getProperty(AZKABAN_LINK_JOBEXEC_URL, 
"");
+
+    metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
+        jobProps.getProperty(ConfigurationKeys.FLOW_GROUP_KEY, "")));
+    metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
+        jobProps.getProperty(ConfigurationKeys.FLOW_NAME_KEY)));
+
+    // use job execution id if flow execution id is not present
+    metadataTags.add(new 
Tag<>(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+        jobProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
jobExecutionId)));
+
+    //Use azkaban.flow.execid as the jobExecutionId
+    metadataTags.add(new 
Tag<>(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, jobExecutionId));
+
+    metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
+        jobProps.getProperty(ConfigurationKeys.JOB_GROUP_KEY, "")));
+    metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
+        jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY, "")));
+    metadataTags.add(new Tag<>(TimingEvent.METADATA_MESSAGE, jobExecutionUrl));
+
+    LOG.debug(String.format("AzkabanJobLauncher.addAdditionalMetadataTags: 
metadataTags %s", metadataTags));
+
+    return metadataTags;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/58f00f01/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
 
b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
index d83cc36..2b98419 100644
--- 
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
+++ 
b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
@@ -31,12 +31,15 @@ import com.google.common.base.Optional;
 import com.google.common.io.Closer;
 import com.typesafe.config.Config;
 
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.util.ConfigUtils;
 
 
 /**
  * Establishes a connection to a Kafka cluster and push byte messages to a 
specified topic.
  */
+@Slf4j
 public class KafkaProducerPusher implements Pusher<byte[]> {
 
   private final String topic;
@@ -52,7 +55,8 @@ public class KafkaProducerPusher implements Pusher<byte[]> {
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
-    props.put(ProducerConfig.ACKS_CONFIG, "1");
+    props.put(ProducerConfig.ACKS_CONFIG, "all");
+    props.put(ProducerConfig.RETRIES_CONFIG, 3);
 
     // add the kafka scoped config. if any of the above are specified then 
they are overridden
     if (kafkaConfig.isPresent()) {
@@ -72,7 +76,11 @@ public class KafkaProducerPusher implements Pusher<byte[]> {
    */
   public void pushMessages(List<byte[]> messages) {
     for (byte[] message: messages) {
-      this.producer.send(new ProducerRecord<String, byte[]>(topic, message));
+      producer.send(new ProducerRecord<>(topic, message), (recordMetadata, e) 
-> {
+        if (e != null) {
+          log.error("Failed to send message to topic {} due to exception: ", 
topic, e);
+        }
+      });
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/58f00f01/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
 
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
index 52d416b..1ef1063 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
@@ -31,12 +31,15 @@ import com.google.common.base.Optional;
 import com.google.common.io.Closer;
 import com.typesafe.config.Config;
 
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.util.ConfigUtils;
 
 
 /**
  * Establish a connection to a Kafka cluster and push byte messages to a 
specified topic.
  */
+@Slf4j
 public class KafkaProducerPusher implements Pusher<byte[]> {
 
   private final String topic;
@@ -52,7 +55,8 @@ public class KafkaProducerPusher implements Pusher<byte[]> {
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
-    props.put(ProducerConfig.ACKS_CONFIG, "1");
+    props.put(ProducerConfig.ACKS_CONFIG, "all");
+    props.put(ProducerConfig.RETRIES_CONFIG, 3);
 
     // add the kafka scoped config. if any of the above are specified then 
they are overridden
     if (kafkaConfig.isPresent()) {
@@ -72,7 +76,11 @@ public class KafkaProducerPusher implements Pusher<byte[]> {
    */
   public void pushMessages(List<byte[]> messages) {
     for (byte[] message: messages) {
-      this.producer.send(new ProducerRecord<String, byte[]>(topic, message));
+      producer.send(new ProducerRecord<>(topic, message), (recordMetadata, e) 
-> {
+        if (e != null) {
+          log.error("Failed to send message to topic {} due to exception: ", 
topic, e);
+        }
+      });
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/58f00f01/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobLauncherFactory.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobLauncherFactory.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobLauncherFactory.java
index 970bd14..ec0a7e4 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobLauncherFactory.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobLauncherFactory.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.runtime;
 
+import java.util.List;
 import java.util.Properties;
 
 import javax.annotation.Nonnull;
@@ -24,10 +25,12 @@ import lombok.extern.slf4j.Slf4j;
 
 import com.google.common.base.Enums;
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
 
 import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
 import org.apache.gobblin.broker.iface.SharedResourcesBroker;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.runtime.local.LocalJobLauncher;
 import org.apache.gobblin.runtime.mapreduce.MRJobLauncher;
 import org.apache.gobblin.util.JobConfigurationUtils;
@@ -79,10 +82,29 @@ public class JobLauncherFactory {
    */
   public static @Nonnull JobLauncher newJobLauncher(Properties sysProps, 
Properties jobProps,
       SharedResourcesBroker<GobblinScopeTypes> instanceBroker) throws 
Exception {
+    return newJobLauncher(sysProps, jobProps, instanceBroker, 
ImmutableList.of());
+  }
+
+
+  /**
+   * Create a new {@link JobLauncher}.
+   *
+   * <p>
+   *   This method will never return a {@code null}.
+   * </p>
+   *
+   * @param sysProps system configuration properties
+   * @param jobProps job configuration properties
+   * @param instanceBroker
+   * @param metadataTags
+   * @return newly created {@link JobLauncher}
+   */
+  public static @Nonnull JobLauncher newJobLauncher(Properties sysProps, 
Properties jobProps,
+      SharedResourcesBroker<GobblinScopeTypes> instanceBroker, List<? extends 
Tag<?>> metadataTags) throws Exception {
 
     String launcherTypeValue =
         sysProps.getProperty(ConfigurationKeys.JOB_LAUNCHER_TYPE_KEY, 
JobLauncherType.LOCAL.name());
-    return newJobLauncher(sysProps, jobProps, launcherTypeValue, 
instanceBroker);
+    return newJobLauncher(sysProps, jobProps, launcherTypeValue, 
instanceBroker, metadataTags);
   }
 
   /**
@@ -97,15 +119,31 @@ public class JobLauncherFactory {
    */
   public static JobLauncher newJobLauncher(Properties sysProps, Properties 
jobProps,
       String launcherTypeValue, SharedResourcesBroker<GobblinScopeTypes> 
instanceBroker) {
+    return newJobLauncher(sysProps, jobProps, launcherTypeValue, 
instanceBroker, ImmutableList.of());
+  }
+
+  /**
+   * Creates a new instance for a JobLauncher with a given type
+   * @param sysProps          the system/environment properties
+   * @param jobProps          the job properties
+   * @param launcherTypeValue the type of the launcher; either a {@link 
JobLauncherType} value or
+   *        the name of the class that extends {@link AbstractJobLauncher} and 
has a constructor
+   *        that has a single Properties parameter..
+   * @param metadataTags additional metadata to be added to timing events
+   * @return the JobLauncher instance
+   * @throws RuntimeException if the instantiation fails
+   */
+  public static JobLauncher newJobLauncher(Properties sysProps, Properties 
jobProps,
+      String launcherTypeValue, SharedResourcesBroker<GobblinScopeTypes> 
instanceBroker, List<? extends Tag<?>> metadataTags) {
     Optional<JobLauncherType> launcherType = 
Enums.getIfPresent(JobLauncherType.class, launcherTypeValue);
 
     try {
       if (launcherType.isPresent()) {
         switch (launcherType.get()) {
           case LOCAL:
-              return new 
LocalJobLauncher(JobConfigurationUtils.combineSysAndJobProperties(sysProps, 
jobProps), instanceBroker);
+              return new 
LocalJobLauncher(JobConfigurationUtils.combineSysAndJobProperties(sysProps, 
jobProps), instanceBroker, metadataTags);
           case MAPREDUCE:
-            return new 
MRJobLauncher(JobConfigurationUtils.combineSysAndJobProperties(sysProps, 
jobProps), instanceBroker);
+            return new 
MRJobLauncher(JobConfigurationUtils.combineSysAndJobProperties(sysProps, 
jobProps), instanceBroker, metadataTags);
           default:
             throw new RuntimeException("Unsupported job launcher type: " + 
launcherType.get().name());
         }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/58f00f01/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalJobLauncher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalJobLauncher.java
index ac3671b..fc0e197 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalJobLauncher.java
@@ -70,11 +70,15 @@ public class LocalJobLauncher extends AbstractJobLauncher {
   private final ServiceManager serviceManager;
 
   public LocalJobLauncher(Properties jobProps) throws Exception {
-    this(jobProps, null);
+    this(jobProps, null, ImmutableList.of());
   }
 
   public LocalJobLauncher(Properties jobProps, 
SharedResourcesBroker<GobblinScopeTypes> instanceBroker) throws Exception {
-    super(jobProps, ImmutableList.<Tag<?>> of(), instanceBroker);
+    this(jobProps, instanceBroker, ImmutableList.of());
+  }
+
+  public LocalJobLauncher(Properties jobProps, 
SharedResourcesBroker<GobblinScopeTypes> instanceBroker, List<? extends Tag<?>> 
metadataTags) throws Exception {
+    super(jobProps, metadataTags, instanceBroker);
     log.debug("Local job launched with properties: {}", jobProps);
 
     TimingEvent jobLocalSetupTimer = 
this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.JOB_LOCAL_SETUP);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/58f00f01/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
index addb0f0..1f53054 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
@@ -157,9 +157,18 @@ public class MRJobLauncher extends AbstractJobLauncher {
     this(jobProps, new Configuration(), instanceBroker);
   }
 
-  public MRJobLauncher(Properties jobProps, Configuration conf, 
SharedResourcesBroker<GobblinScopeTypes> instanceBroker)
+  public MRJobLauncher(Properties jobProps, Configuration conf, 
SharedResourcesBroker<GobblinScopeTypes> instanceBroker) throws Exception {
+    this(jobProps, conf, instanceBroker, ImmutableList.of());
+  }
+
+  public MRJobLauncher(Properties jobProps, 
SharedResourcesBroker<GobblinScopeTypes> instanceBroker, List<? extends Tag<?>> 
metadataTags) throws Exception {
+    this(jobProps, new Configuration(), instanceBroker, metadataTags);
+  }
+
+
+  public MRJobLauncher(Properties jobProps, Configuration conf, 
SharedResourcesBroker<GobblinScopeTypes> instanceBroker,List<? extends Tag<?>> 
metadataTags)
       throws Exception {
-    super(jobProps, ImmutableList.<Tag<?>>of());
+    super(jobProps, metadataTags);
 
     this.conf = conf;
     // Put job configuration properties into the Hadoop configuration so they 
are available in the mappers

Reply via email to