Repository: incubator-gobblin Updated Branches: refs/heads/master 29c76c9e7 -> 58f00f019
[GOBBLIN-635] Add metadata tags for GaaS/Azkaban jobs. GOBBLIN-635:Add metadata tags to Gobblin Tracking Event for Azkaban jobs triggered using Gobblin-as- a-Service (GaaS). Closes #2505 from sv2000/azkabanMetadataTags Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/58f00f01 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/58f00f01 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/58f00f01 Branch: refs/heads/master Commit: 58f00f019cfc7334c3fe088deba8ad67a24b5e3a Parents: 29c76c9 Author: suvasude <[email protected]> Authored: Mon Nov 26 10:06:20 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Mon Nov 26 10:06:20 2018 -0800 ---------------------------------------------------------------------- .../gobblin/metrics/event/TimingEvent.java | 1 + .../metrics/reporter/ScheduledReporter.java | 4 +- .../gobblin/azkaban/AzkabanJobLauncher.java | 65 +++++++++++++++++--- .../metrics/kafka/KafkaProducerPusher.java | 12 +++- .../metrics/kafka/KafkaProducerPusher.java | 12 +++- .../gobblin/runtime/JobLauncherFactory.java | 44 ++++++++++++- .../gobblin/runtime/local/LocalJobLauncher.java | 8 ++- .../runtime/mapreduce/MRJobLauncher.java | 13 +++- 8 files changed, 137 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/58f00f01/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java index 71dd947..c3e6259 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java @@ -77,6 +77,7 @@ public class TimingEvent { public static final String METADATA_END_TIME = "endTime"; public static final String METADATA_DURATION = "durationMillis"; public static final String METADATA_TIMING_EVENT = "timingEvent"; + public static final String METADATA_MESSAGE = "message"; private final String name; private final Long startTime; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/58f00f01/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/ScheduledReporter.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/ScheduledReporter.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/ScheduledReporter.java index 154dd3a..d15f15a 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/ScheduledReporter.java +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/ScheduledReporter.java @@ -147,7 +147,9 @@ public abstract class ScheduledReporter extends ContextAwareReporter { @Override public void stopImpl() { - this.scheduledTask.get().cancel(false); + if (this.scheduledTask.isPresent()) { + this.scheduledTask.get().cancel(false); + } this.scheduledTask = Optional.absent(); ExecutorsUtils.shutdownExecutorService(this.executor, Optional.of(log), 10, TimeUnit.SECONDS); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/58f00f01/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java index 3a28b18..05c1bb9 100644 --- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java @@ -17,7 +17,6 @@ package org.apache.gobblin.azkaban; -import com.google.common.base.Optional; import java.io.File; import java.io.IOException; import java.net.URI; @@ -33,10 +32,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.apache.gobblin.runtime.job_catalog.PackagedTemplatesJobCatalogDecorator; -import org.apache.gobblin.runtime.listeners.CompositeJobListener; -import org.apache.gobblin.util.ClassAliasResolver; -import org.apache.gobblin.util.ConfigUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.log4j.Level; @@ -44,6 +39,7 @@ import org.apache.log4j.Logger; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Splitter; import com.google.common.base.Strings; @@ -53,12 +49,16 @@ import com.google.common.io.Closer; import com.typesafe.config.Config; import com.typesafe.config.ConfigValue; +import azkaban.jobExecutor.AbstractJob; +import javax.annotation.Nullable; + import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.DynamicConfigGenerator; import org.apache.gobblin.configuration.State; import org.apache.gobblin.metrics.GobblinMetrics; import org.apache.gobblin.metrics.RootMetricContext; import org.apache.gobblin.metrics.Tag; +import org.apache.gobblin.metrics.event.TimingEvent; import org.apache.gobblin.runtime.DynamicConfigGeneratorFactory; import org.apache.gobblin.runtime.JobException; import org.apache.gobblin.runtime.JobLauncher; @@ -66,15 +66,16 @@ import org.apache.gobblin.runtime.JobLauncherFactory; import org.apache.gobblin.runtime.app.ApplicationException; import org.apache.gobblin.runtime.app.ApplicationLauncher; import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher; +import org.apache.gobblin.runtime.job_catalog.PackagedTemplatesJobCatalogDecorator; +import org.apache.gobblin.runtime.listeners.CompositeJobListener; import org.apache.gobblin.runtime.listeners.EmailNotificationJobListener; import org.apache.gobblin.runtime.listeners.JobListener; +import org.apache.gobblin.util.ClassAliasResolver; +import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.HadoopUtils; import org.apache.gobblin.util.TimeRangeChecker; import org.apache.gobblin.util.hadoop.TokenUtils; -import azkaban.jobExecutor.AbstractJob; -import javax.annotation.Nullable; - import static org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION; @@ -105,6 +106,7 @@ public class AzkabanJobLauncher extends AbstractJob implements ApplicationLaunch private static final String HADOOP_FS_DEFAULT_NAME = "fs.default.name"; private static final String AZKABAN_LINK_JOBEXEC_URL = "azkaban.link.jobexec.url"; + private static final String AZKABAN_FLOW_EXEC_ID = "azkaban.flow.execid"; private static final String MAPREDUCE_JOB_CREDENTIALS_BINARY = "mapreduce.job.credentials.binary"; private static final String AZKABAN_GOBBLIN_JOB_SLA_IN_SECONDS = "gobblin.azkaban.SLAInSeconds"; @@ -126,7 +128,7 @@ public class AzkabanJobLauncher extends AbstractJob implements ApplicationLaunch public AzkabanJobLauncher(String jobId, Properties props) throws Exception { - super(jobId, LOG); + super(jobId, LOG); HadoopUtils.addGobblinSite(); @@ -215,10 +217,17 @@ public class AzkabanJobLauncher extends AbstractJob implements ApplicationLaunch this.ownAzkabanSla = Long.parseLong( jobProps.getProperty(AZKABAN_GOBBLIN_JOB_SLA_IN_SECONDS, DEFAULT_AZKABAN_GOBBLIN_JOB_SLA_IN_SECONDS)); + List<? extends Tag<?>> metadataTags = Lists.newArrayList(); + //Is the job triggered using Gobblin-as-a-Service? If so, add additional tags needed for tracking + //the job execution. + if (jobProps.containsKey(ConfigurationKeys.FLOW_NAME_KEY)) { + metadataTags = addAdditionalMetadataTags(jobProps); + } + // Create a JobLauncher instance depending on the configuration. The same properties object is // used for both system and job configuration properties because Azkaban puts configuration // properties in the .job file and in the .properties file into the same Properties object. - this.jobLauncher = this.closer.register(JobLauncherFactory.newJobLauncher(jobProps, jobProps)); + this.jobLauncher = this.closer.register(JobLauncherFactory.newJobLauncher(jobProps, jobProps, null, metadataTags)); // Since Java classes cannot extend multiple classes and Azkaban jobs must extend AbstractJob, we must use composition // verses extending ServiceBasedAppLauncher @@ -361,4 +370,40 @@ public class AzkabanJobLauncher extends AbstractJob implements ApplicationLaunch return true; } + + + /** + * Add additional properties such as flow.group, flow.name, executionUrl. Useful for tracking + * job executions on Azkaban triggered by Gobblin-as-a-Service (GaaS). + * @param jobProps job properties + * @return a list of tags uniquely identifying a job execution on Azkaban. + */ + private static List<? extends Tag<?>> addAdditionalMetadataTags(Properties jobProps) { + List<Tag<?>> metadataTags = Lists.newArrayList(); + String jobExecutionId = jobProps.getProperty(AZKABAN_FLOW_EXEC_ID, ""); + String jobExecutionUrl = jobProps.getProperty(AZKABAN_LINK_JOBEXEC_URL, ""); + + metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, + jobProps.getProperty(ConfigurationKeys.FLOW_GROUP_KEY, ""))); + metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, + jobProps.getProperty(ConfigurationKeys.FLOW_NAME_KEY))); + + // use job execution id if flow execution id is not present + metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, + jobProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, jobExecutionId))); + + //Use azkaban.flow.execid as the jobExecutionId + metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, jobExecutionId)); + + metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, + jobProps.getProperty(ConfigurationKeys.JOB_GROUP_KEY, ""))); + metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY, ""))); + metadataTags.add(new Tag<>(TimingEvent.METADATA_MESSAGE, jobExecutionUrl)); + + LOG.debug(String.format("AzkabanJobLauncher.addAdditionalMetadataTags: metadataTags %s", metadataTags)); + + return metadataTags; + } + } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/58f00f01/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java index d83cc36..2b98419 100644 --- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java +++ b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java @@ -31,12 +31,15 @@ import com.google.common.base.Optional; import com.google.common.io.Closer; import com.typesafe.config.Config; +import lombok.extern.slf4j.Slf4j; + import org.apache.gobblin.util.ConfigUtils; /** * Establishes a connection to a Kafka cluster and push byte messages to a specified topic. */ +@Slf4j public class KafkaProducerPusher implements Pusher<byte[]> { private final String topic; @@ -52,7 +55,8 @@ public class KafkaProducerPusher implements Pusher<byte[]> { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); - props.put(ProducerConfig.ACKS_CONFIG, "1"); + props.put(ProducerConfig.ACKS_CONFIG, "all"); + props.put(ProducerConfig.RETRIES_CONFIG, 3); // add the kafka scoped config. if any of the above are specified then they are overridden if (kafkaConfig.isPresent()) { @@ -72,7 +76,11 @@ public class KafkaProducerPusher implements Pusher<byte[]> { */ public void pushMessages(List<byte[]> messages) { for (byte[] message: messages) { - this.producer.send(new ProducerRecord<String, byte[]>(topic, message)); + producer.send(new ProducerRecord<>(topic, message), (recordMetadata, e) -> { + if (e != null) { + log.error("Failed to send message to topic {} due to exception: ", topic, e); + } + }); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/58f00f01/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java index 52d416b..1ef1063 100644 --- a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java +++ b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java @@ -31,12 +31,15 @@ import com.google.common.base.Optional; import com.google.common.io.Closer; import com.typesafe.config.Config; +import lombok.extern.slf4j.Slf4j; + import org.apache.gobblin.util.ConfigUtils; /** * Establish a connection to a Kafka cluster and push byte messages to a specified topic. */ +@Slf4j public class KafkaProducerPusher implements Pusher<byte[]> { private final String topic; @@ -52,7 +55,8 @@ public class KafkaProducerPusher implements Pusher<byte[]> { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); - props.put(ProducerConfig.ACKS_CONFIG, "1"); + props.put(ProducerConfig.ACKS_CONFIG, "all"); + props.put(ProducerConfig.RETRIES_CONFIG, 3); // add the kafka scoped config. if any of the above are specified then they are overridden if (kafkaConfig.isPresent()) { @@ -72,7 +76,11 @@ public class KafkaProducerPusher implements Pusher<byte[]> { */ public void pushMessages(List<byte[]> messages) { for (byte[] message: messages) { - this.producer.send(new ProducerRecord<String, byte[]>(topic, message)); + producer.send(new ProducerRecord<>(topic, message), (recordMetadata, e) -> { + if (e != null) { + log.error("Failed to send message to topic {} due to exception: ", topic, e); + } + }); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/58f00f01/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobLauncherFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobLauncherFactory.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobLauncherFactory.java index 970bd14..ec0a7e4 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobLauncherFactory.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobLauncherFactory.java @@ -17,6 +17,7 @@ package org.apache.gobblin.runtime; +import java.util.List; import java.util.Properties; import javax.annotation.Nonnull; @@ -24,10 +25,12 @@ import lombok.extern.slf4j.Slf4j; import com.google.common.base.Enums; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; import org.apache.gobblin.broker.iface.SharedResourcesBroker; import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.Tag; import org.apache.gobblin.runtime.local.LocalJobLauncher; import org.apache.gobblin.runtime.mapreduce.MRJobLauncher; import org.apache.gobblin.util.JobConfigurationUtils; @@ -79,10 +82,29 @@ public class JobLauncherFactory { */ public static @Nonnull JobLauncher newJobLauncher(Properties sysProps, Properties jobProps, SharedResourcesBroker<GobblinScopeTypes> instanceBroker) throws Exception { + return newJobLauncher(sysProps, jobProps, instanceBroker, ImmutableList.of()); + } + + + /** + * Create a new {@link JobLauncher}. + * + * <p> + * This method will never return a {@code null}. + * </p> + * + * @param sysProps system configuration properties + * @param jobProps job configuration properties + * @param instanceBroker + * @param metadataTags + * @return newly created {@link JobLauncher} + */ + public static @Nonnull JobLauncher newJobLauncher(Properties sysProps, Properties jobProps, + SharedResourcesBroker<GobblinScopeTypes> instanceBroker, List<? extends Tag<?>> metadataTags) throws Exception { String launcherTypeValue = sysProps.getProperty(ConfigurationKeys.JOB_LAUNCHER_TYPE_KEY, JobLauncherType.LOCAL.name()); - return newJobLauncher(sysProps, jobProps, launcherTypeValue, instanceBroker); + return newJobLauncher(sysProps, jobProps, launcherTypeValue, instanceBroker, metadataTags); } /** @@ -97,15 +119,31 @@ public class JobLauncherFactory { */ public static JobLauncher newJobLauncher(Properties sysProps, Properties jobProps, String launcherTypeValue, SharedResourcesBroker<GobblinScopeTypes> instanceBroker) { + return newJobLauncher(sysProps, jobProps, launcherTypeValue, instanceBroker, ImmutableList.of()); + } + + /** + * Creates a new instance for a JobLauncher with a given type + * @param sysProps the system/environment properties + * @param jobProps the job properties + * @param launcherTypeValue the type of the launcher; either a {@link JobLauncherType} value or + * the name of the class that extends {@link AbstractJobLauncher} and has a constructor + * that has a single Properties parameter.. + * @param metadataTags additional metadata to be added to timing events + * @return the JobLauncher instance + * @throws RuntimeException if the instantiation fails + */ + public static JobLauncher newJobLauncher(Properties sysProps, Properties jobProps, + String launcherTypeValue, SharedResourcesBroker<GobblinScopeTypes> instanceBroker, List<? extends Tag<?>> metadataTags) { Optional<JobLauncherType> launcherType = Enums.getIfPresent(JobLauncherType.class, launcherTypeValue); try { if (launcherType.isPresent()) { switch (launcherType.get()) { case LOCAL: - return new LocalJobLauncher(JobConfigurationUtils.combineSysAndJobProperties(sysProps, jobProps), instanceBroker); + return new LocalJobLauncher(JobConfigurationUtils.combineSysAndJobProperties(sysProps, jobProps), instanceBroker, metadataTags); case MAPREDUCE: - return new MRJobLauncher(JobConfigurationUtils.combineSysAndJobProperties(sysProps, jobProps), instanceBroker); + return new MRJobLauncher(JobConfigurationUtils.combineSysAndJobProperties(sysProps, jobProps), instanceBroker, metadataTags); default: throw new RuntimeException("Unsupported job launcher type: " + launcherType.get().name()); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/58f00f01/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalJobLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalJobLauncher.java index ac3671b..fc0e197 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalJobLauncher.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalJobLauncher.java @@ -70,11 +70,15 @@ public class LocalJobLauncher extends AbstractJobLauncher { private final ServiceManager serviceManager; public LocalJobLauncher(Properties jobProps) throws Exception { - this(jobProps, null); + this(jobProps, null, ImmutableList.of()); } public LocalJobLauncher(Properties jobProps, SharedResourcesBroker<GobblinScopeTypes> instanceBroker) throws Exception { - super(jobProps, ImmutableList.<Tag<?>> of(), instanceBroker); + this(jobProps, instanceBroker, ImmutableList.of()); + } + + public LocalJobLauncher(Properties jobProps, SharedResourcesBroker<GobblinScopeTypes> instanceBroker, List<? extends Tag<?>> metadataTags) throws Exception { + super(jobProps, metadataTags, instanceBroker); log.debug("Local job launched with properties: {}", jobProps); TimingEvent jobLocalSetupTimer = this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.JOB_LOCAL_SETUP); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/58f00f01/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java index addb0f0..1f53054 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java @@ -157,9 +157,18 @@ public class MRJobLauncher extends AbstractJobLauncher { this(jobProps, new Configuration(), instanceBroker); } - public MRJobLauncher(Properties jobProps, Configuration conf, SharedResourcesBroker<GobblinScopeTypes> instanceBroker) + public MRJobLauncher(Properties jobProps, Configuration conf, SharedResourcesBroker<GobblinScopeTypes> instanceBroker) throws Exception { + this(jobProps, conf, instanceBroker, ImmutableList.of()); + } + + public MRJobLauncher(Properties jobProps, SharedResourcesBroker<GobblinScopeTypes> instanceBroker, List<? extends Tag<?>> metadataTags) throws Exception { + this(jobProps, new Configuration(), instanceBroker, metadataTags); + } + + + public MRJobLauncher(Properties jobProps, Configuration conf, SharedResourcesBroker<GobblinScopeTypes> instanceBroker,List<? extends Tag<?>> metadataTags) throws Exception { - super(jobProps, ImmutableList.<Tag<?>>of()); + super(jobProps, metadataTags); this.conf = conf; // Put job configuration properties into the Hadoop configuration so they are available in the mappers
