Repository: incubator-gobblin Updated Branches: refs/heads/master c8707eded -> 5c03b11b5
[GOBBLIN-317] Add dynamic configuration injection in the mappers Closes #2170 from htran1/dynamic_config Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/5c03b11b Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/5c03b11b Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/5c03b11b Branch: refs/heads/master Commit: 5c03b11b5b5a62a611fa2a1042bfe64c283c6a5e Parents: c8707ed Author: Hung Tran <[email protected]> Authored: Mon Nov 20 19:51:48 2017 -0800 Committer: Hung Tran <[email protected]> Committed: Mon Nov 20 19:51:48 2017 -0800 ---------------------------------------------------------------------- .../configuration/ConfigurationKeys.java | 8 ++- .../configuration/DynamicConfigGenerator.java | 33 ++++++++++++ .../NoopDynamicConfigGenerator.java | 36 +++++++++++++ .../gobblin/azkaban/AzkabanJobLauncher.java | 15 ++++++ .../kafka/client/Kafka09ConsumerClient.java | 7 ++- .../metrics/kafka/KafkaAvroReporter.java | 6 +-- .../gobblin/metrics/kafka/KafkaReporter.java | 16 +++++- .../metrics/kafka/KafkaReporterFactory.java | 9 +++- .../runtime/DynamicConfigGeneratorFactory.java | 53 ++++++++++++++++++++ .../runtime/mapreduce/MRJobLauncher.java | 26 ++++++++-- .../gobblin/runtime/JobLauncherTestHelper.java | 2 + .../runtime/mapreduce/MRJobLauncherTest.java | 24 +++++++++ 12 files changed, 220 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c03b11b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index a563b43..3e576ce 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -205,6 +205,12 @@ public class ConfigurationKeys { public static final String DEFAULT_EVENT_METADATA_GENERATOR_CLASS_KEY = "noop"; /** + * Configuration for dynamic configuration generation + */ + public static final String DYNAMIC_CONFIG_GENERATOR_CLASS_KEY = "dynamicConfigGenerator.class"; + public static final String DEFAULT_DYNAMIC_CONFIG_GENERATOR_CLASS_KEY = "noop"; + + /** * Configuration properties used internally. */ public static final String JOB_ID_KEY = "job.id"; @@ -734,7 +740,7 @@ public class ConfigurationKeys { public static final boolean DEFAULT_KAFKA_SOURCE_SHARE_CONSUMER_CLIENT = false; public static final String KAFKA_SOURCE_AVG_FETCH_TIME_CAP = "kakfa.source.avgFetchTimeCap"; public static final int DEFAULT_KAFKA_SOURCE_AVG_FETCH_TIME_CAP = 100; - + public static final String SHARED_KAFKA_CONFIG_PREFIX = "gobblin.kafka.sharedConfig"; /** * Job execution info server and history store configuration properties. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c03b11b/gobblin-api/src/main/java/org/apache/gobblin/configuration/DynamicConfigGenerator.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/DynamicConfigGenerator.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/DynamicConfigGenerator.java new file mode 100644 index 0000000..9783fab --- /dev/null +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/DynamicConfigGenerator.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.configuration; + +import com.typesafe.config.Config; + +/** + * For generating dynamic configuration that gets added to the job configuration. + * These are configuration values that cannot be determined statically at job specification time. + * One example is the SSL certificate location of a certificate that is fetched at runtime. + */ +public interface DynamicConfigGenerator { + /** + * Generate dynamic configuration that should be added to the job configuration. + * @param config configuration + * @return config object with the dynamic configuration + */ + Config generateDynamicConfig(Config config); +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c03b11b/gobblin-api/src/main/java/org/apache/gobblin/configuration/NoopDynamicConfigGenerator.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/NoopDynamicConfigGenerator.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/NoopDynamicConfigGenerator.java new file mode 100644 index 0000000..c990690 --- /dev/null +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/NoopDynamicConfigGenerator.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.configuration; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.apache.gobblin.annotation.Alias; + + +/** + * NoOp dynamic config generator that returns an empty {@link Config} + */ +@Alias("noop") +public class NoopDynamicConfigGenerator implements DynamicConfigGenerator { + + public NoopDynamicConfigGenerator() { + } + + public Config generateDynamicConfig(Config config) { + return ConfigFactory.empty(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c03b11b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java index 20b630b..5c9fc1d 100644 --- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.IOException; import java.net.URI; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.UUID; @@ -46,12 +47,15 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.io.Closer; import com.typesafe.config.Config; +import com.typesafe.config.ConfigValue; import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.DynamicConfigGenerator; import org.apache.gobblin.configuration.State; import org.apache.gobblin.metrics.GobblinMetrics; import org.apache.gobblin.metrics.RootMetricContext; import org.apache.gobblin.metrics.Tag; +import org.apache.gobblin.runtime.DynamicConfigGeneratorFactory; import org.apache.gobblin.runtime.JobException; import org.apache.gobblin.runtime.JobLauncher; import org.apache.gobblin.runtime.JobLauncherFactory; @@ -128,6 +132,17 @@ public class AzkabanJobLauncher extends AbstractJob implements ApplicationLaunch this.props = new Properties(); this.props.putAll(props); + // load dynamic configuration and add them to the job properties + Config propsAsConfig = ConfigUtils.propertiesToConfig(props); + DynamicConfigGenerator dynamicConfigGenerator = + DynamicConfigGeneratorFactory.createDynamicConfigGenerator(propsAsConfig); + Config dynamicConfig = dynamicConfigGenerator.generateDynamicConfig(propsAsConfig); + + // add the dynamic config to the job config + for (Map.Entry<String, ConfigValue> entry : dynamicConfig.entrySet()) { + this.props.put(entry.getKey(), entry.getValue().unwrapped().toString()); + } + Configuration conf = new Configuration(); String fsUri = conf.get(HADOOP_FS_DEFAULT_NAME); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c03b11b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java index 7f83192..5ab27e0 100644 --- a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java +++ b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java @@ -44,6 +44,7 @@ import javax.annotation.Nonnull; import lombok.EqualsAndHashCode; import lombok.ToString; +import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException; import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition; import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic; @@ -96,8 +97,10 @@ public class Kafka09ConsumerClient<K, V> extends AbstractBaseKafkaConsumerClient // grab all the config under "source.kafka" and add the defaults as fallback. Config baseConfig = ConfigUtils.getConfigOrEmpty(config, CONFIG_NAMESPACE).withFallback(FALLBACK); - // get the "source.kafka.consumerConfig" config for extra config to pass along to Kafka - Config specificConfig = ConfigUtils.getConfigOrEmpty(baseConfig, CONSUMER_CONFIG); + // get the "source.kafka.consumerConfig" config for extra config to pass along to Kafka with a fallback to the + // shared config that start with "gobblin.kafka.sharedConfig" + Config specificConfig = ConfigUtils.getConfigOrEmpty(baseConfig, CONSUMER_CONFIG).withFallback( + ConfigUtils.getConfigOrEmpty(config, ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX)); // The specific config overrides settings in the base config Config scopedConfig = specificConfig.withFallback(baseConfig.withoutPath(CONSUMER_CONFIG)); props.putAll(ConfigUtils.configToProperties(scopedConfig)); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c03b11b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java index 4b6399b..5cb3ce8 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroReporter.java @@ -25,13 +25,11 @@ import org.apache.avro.Schema; import com.google.common.base.Optional; import com.typesafe.config.Config; -import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.metrics.MetricReport; import org.apache.gobblin.metrics.reporter.util.AvroBinarySerializer; import org.apache.gobblin.metrics.reporter.util.AvroSerializer; import org.apache.gobblin.metrics.reporter.util.SchemaRegistryVersionWriter; import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter; -import org.apache.gobblin.util.ConfigUtils; /** @@ -99,7 +97,9 @@ public class KafkaAvroReporter extends KafkaReporter { public KafkaAvroReporter build(String brokers, String topic, Properties props) throws IOException { this.brokers = brokers; this.topic = topic; - return new KafkaAvroReporter(this, ConfigUtils.propertiesToConfig(props, Optional.of(ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX))); + + // create a KafkaAvroReporter with metrics.* and gobblin.kafka.sharedConfig.* keys + return new KafkaAvroReporter(this, KafkaReporter.getKafkaAndMetricsConfigFromProperties(props)); } } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c03b11b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java index 1c935b4..40a9fed 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java @@ -72,7 +72,9 @@ public class KafkaReporter extends MetricReportReporter { if (builder.kafkaPusher.isPresent()) { this.kafkaPusher = builder.kafkaPusher.get(); } else { - Config kafkaConfig = ConfigUtils.getConfigOrEmpty(config, PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX); + Config kafkaConfig = ConfigUtils.getConfigOrEmpty(config, PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX) + .withFallback(ConfigUtils.getConfigOrEmpty(config, ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX)); + String pusherClassName = ConfigUtils.getString(config, PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY, PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME); @@ -85,6 +87,15 @@ public class KafkaReporter extends MetricReportReporter { } /** + * Get config with metrics configuration and shared kafka configuration + */ + public static Config getKafkaAndMetricsConfigFromProperties(Properties props) { + return ConfigUtils.propertiesToConfig(props, Optional.of(ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX)) + .withFallback(ConfigUtils.propertiesToConfig(props, + Optional.of(ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX))); + } + + /** * A static factory class for obtaining new {@link Builder}s * * @see Builder @@ -139,7 +150,8 @@ public class KafkaReporter extends MetricReportReporter { this.brokers = brokers; this.topic = topic; - return new KafkaReporter(this, ConfigUtils.propertiesToConfig(props, Optional.of(ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX))); + // create a KafkaReporter with metrics.* and gobblin.kafka.sharedConfig.* keys + return new KafkaReporter(this, KafkaReporter.getKafkaAndMetricsConfigFromProperties(props)); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c03b11b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java index 9faac33..4dcf717 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java @@ -94,8 +94,13 @@ public class KafkaReporterFactory implements CustomCodahaleReporterFactory { KafkaEventReporter.Builder<?> builder = formatEnum.eventReporterBuilder(RootMetricContext.get(), properties); - Config kafkaConfig = ConfigUtils.getConfigOrEmpty(ConfigUtils.propertiesToConfig(properties), - PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX); + Config allConfig = ConfigUtils.propertiesToConfig(properties); + // the kafka configuration is composed of the metrics reporting specific keys with a fallback to the shared + // kafka config + Config kafkaConfig = ConfigUtils.getConfigOrEmpty(allConfig, + PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX).withFallback(ConfigUtils.getConfigOrEmpty(allConfig, + ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX)); + builder.withConfig(kafkaConfig); builder.withPusherClassName(properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY, http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c03b11b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DynamicConfigGeneratorFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DynamicConfigGeneratorFactory.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DynamicConfigGeneratorFactory.java new file mode 100644 index 0000000..c424a9c --- /dev/null +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DynamicConfigGeneratorFactory.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.runtime; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import java.util.Collections; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.DynamicConfigGenerator; +import org.apache.gobblin.util.ClassAliasResolver; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.reflection.GobblinConstructorUtils; + + +/** + * For getting an instance of a {@link DynamicConfigGenerator} + */ +public class DynamicConfigGeneratorFactory { + /** + * Get an instance of a {@link DynamicConfigGenerator} + * @param config {@link Config} to pass to the constructor + * @return an instance of {@link DynamicConfigGenerator} + */ + public static DynamicConfigGenerator createDynamicConfigGenerator(Config config) { + String dynamicConfigGeneratorClassName = + ConfigUtils.getString(config, ConfigurationKeys.DYNAMIC_CONFIG_GENERATOR_CLASS_KEY, + ConfigurationKeys.DEFAULT_DYNAMIC_CONFIG_GENERATOR_CLASS_KEY); + + try { + ClassAliasResolver<DynamicConfigGenerator> aliasResolver = + new ClassAliasResolver<>(DynamicConfigGenerator.class); + return aliasResolver.resolveClass(dynamicConfigGeneratorClassName).newInstance(); + } catch (ReflectiveOperationException e) { + throw new RuntimeException("Could not construct DynamicConfigGenerator " + + dynamicConfigGeneratorClassName, e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c03b11b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java index 9f17db1..306b4ef 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java @@ -21,10 +21,9 @@ import java.io.DataOutputStream; import java.io.FileInputStream; import java.io.IOException; import java.net.URI; -import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Properties; -import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -56,7 +55,9 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.io.Closer; import com.google.common.util.concurrent.ServiceManager; +import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValue; import org.apache.gobblin.broker.SharedResourcesBrokerFactory; import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; @@ -64,6 +65,7 @@ import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance; import org.apache.gobblin.broker.iface.SharedResourcesBroker; import org.apache.gobblin.commit.CommitStep; import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.DynamicConfigGenerator; import org.apache.gobblin.metastore.FsStateStore; import org.apache.gobblin.metastore.StateStore; import org.apache.gobblin.metrics.GobblinMetrics; @@ -71,6 +73,7 @@ import org.apache.gobblin.metrics.Tag; import org.apache.gobblin.metrics.event.TimingEvent; import org.apache.gobblin.password.PasswordManager; import org.apache.gobblin.runtime.AbstractJobLauncher; +import org.apache.gobblin.runtime.DynamicConfigGeneratorFactory; import org.apache.gobblin.runtime.GobblinMultiTaskAttempt; import org.apache.gobblin.runtime.JobLauncher; import org.apache.gobblin.runtime.JobState; @@ -83,6 +86,7 @@ import org.apache.gobblin.runtime.util.JobMetrics; import org.apache.gobblin.runtime.util.MetricGroup; import org.apache.gobblin.source.workunit.MultiWorkUnit; import org.apache.gobblin.source.workunit.WorkUnit; +import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.HadoopUtils; import org.apache.gobblin.util.JobConfigurationUtils; import org.apache.gobblin.util.JobLauncherUtils; @@ -626,7 +630,21 @@ public class MRJobLauncher extends AbstractJobLauncher { throw new RuntimeException("Failed to setup the mapper task", ioe); } - this.taskExecutor = new TaskExecutor(context.getConfiguration()); + + // load dynamic configuration to add to the job configuration + Configuration configuration = context.getConfiguration(); + Config jobStateAsConfig = ConfigUtils.propertiesToConfig(this.jobState.getProperties()); + DynamicConfigGenerator dynamicConfigGenerator = DynamicConfigGeneratorFactory.createDynamicConfigGenerator( + jobStateAsConfig); + Config dynamicConfig = dynamicConfigGenerator.generateDynamicConfig(jobStateAsConfig); + + // add the dynamic config to the job config + for (Map.Entry<String, ConfigValue> entry : dynamicConfig.entrySet()) { + this.jobState.setProp(entry.getKey(), entry.getValue().unwrapped().toString()); + configuration.set(entry.getKey(), entry.getValue().unwrapped().toString()); + } + + this.taskExecutor = new TaskExecutor(configuration); this.taskStateTracker = new MRTaskStateTracker(context); this.serviceManager = new ServiceManager(Lists.newArrayList(this.taskExecutor, this.taskStateTracker)); try { @@ -636,8 +654,6 @@ public class MRJobLauncher extends AbstractJobLauncher { throw new RuntimeException(te); } - Configuration configuration = context.getConfiguration(); - // Setup and start metrics reporting if metric reporting is enabled if (Boolean.valueOf( configuration.get(ConfigurationKeys.METRICS_ENABLED_KEY, ConfigurationKeys.DEFAULT_METRICS_ENABLED))) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c03b11b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java index 497fd88..c947dfc 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java @@ -50,6 +50,8 @@ import org.apache.gobblin.util.JobLauncherUtils; public class JobLauncherTestHelper { public static final String SOURCE_FILE_LIST_KEY = "source.files"; + public static final String DYNAMIC_KEY1 = "DynamicKey1"; + public static final String DYNAMIC_VALUE1 = "DynamicValue1"; private final StateStore<JobState.DatasetState> datasetStateStore; private final Properties launcherProps; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c03b11b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java index 60e71a7..8d4f308 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java @@ -33,7 +33,12 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import com.google.common.collect.ImmutableMap; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.DynamicConfigGenerator; import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.metastore.FsStateStore; import org.apache.gobblin.metastore.StateStore; @@ -271,6 +276,10 @@ public class MRJobLauncherTest extends BMNGRunner { @BMRule(name = "saveSuccessCount", targetClass = "org.apache.gobblin.metastore.FsStateStore", targetMethod = "put", targetLocation = "AT ENTRY", condition = "$2.endsWith(\".suc\")", action = "org.apache.gobblin.runtime.mapreduce.MRJobLauncherTest.sucCount2 = org.apache.gobblin.runtime.mapreduce.MRJobLauncherTest.sucCount2 + 1"), + @BMRule(name = "checkProp", targetClass = "org.apache.gobblin.runtime.mapreduce.MRJobLauncher$TaskRunner", + targetMethod = "setup", targetLocation = "AT EXIT", + condition = "!$0.jobState.getProp(\"DynamicKey1\").equals(\"DynamicValue1\")", + action = "throw new RuntimeException(\"could not find key\")"), @BMRule(name = "writeSuccessFile", targetClass = "org.apache.gobblin.runtime.GobblinMultiTaskAttempt", targetMethod = "taskSuccessfulInPriorAttempt", targetLocation = "AFTER WRITE $taskStateStore", condition = "$1.endsWith(\"_1\")", @@ -281,6 +290,10 @@ public class MRJobLauncherTest extends BMNGRunner { jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY, jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY) + "-testLaunchJobWithMultiWorkUnitAndSucFile"); jobProps.setProperty("use.multiworkunit", Boolean.toString(true)); + + jobProps.setProperty("dynamicConfigGenerator.class", + "org.apache.gobblin.runtime.mapreduce.MRJobLauncherTest$TestDynamicConfigGenerator"); + try { this.jobLauncherTestHelper.runTestWithSkippedTask(jobProps, "_1"); @@ -366,4 +379,15 @@ public class MRJobLauncherTest extends BMNGRunner { return jobProps; } + + public static class TestDynamicConfigGenerator implements DynamicConfigGenerator { + public TestDynamicConfigGenerator() { + } + + @Override + public Config generateDynamicConfig(Config config) { + return ConfigFactory.parseMap(ImmutableMap.of(JobLauncherTestHelper.DYNAMIC_KEY1, + JobLauncherTestHelper.DYNAMIC_VALUE1)); + } + } }
