Repository: incubator-gobblin Updated Branches: refs/heads/master 6f1a3aff6 -> f0b518b92
[GOBBLIN-354] Support DynamicConfig in AzkabanCompactionJobLauncher Closes #2227 from zxcware/ssl Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f0b518b9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f0b518b9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f0b518b9 Branch: refs/heads/master Commit: f0b518b927678ea3d42a08203c2407b9e8096fc4 Parents: 6f1a3af Author: zhchen <[email protected]> Authored: Tue Jan 2 17:04:11 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Tue Jan 2 17:04:11 2018 -0800 ---------------------------------------------------------------------- .../azkaban/AzkabanCompactionJobLauncher.java | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f0b518b9/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanCompactionJobLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanCompactionJobLauncher.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanCompactionJobLauncher.java index c0b8bf3..a9b8e4d 100644 --- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanCompactionJobLauncher.java +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanCompactionJobLauncher.java @@ -18,11 +18,14 @@ package org.apache.gobblin.azkaban; import java.io.IOException; +import java.util.Map; import java.util.Properties; import azkaban.jobExecutor.AbstractJob; import com.google.common.base.Optional; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigValue; import org.apache.log4j.Logger; @@ -34,12 +37,17 @@ import org.apache.gobblin.compaction.listeners.CompactorListenerCreationExceptio import org.apache.gobblin.compaction.listeners.CompactorListenerFactory; import org.apache.gobblin.compaction.ReflectionCompactorFactory; import org.apache.gobblin.compaction.listeners.ReflectionCompactorListenerFactory; +import org.apache.gobblin.configuration.DynamicConfigGenerator; import org.apache.gobblin.metrics.Tag; +import org.apache.gobblin.runtime.DynamicConfigGeneratorFactory; +import org.apache.gobblin.util.ConfigUtils; /** * A class for launching a Gobblin MR job for compaction through Azkaban. + * @deprecated use {@link AzkabanJobLauncher} and {@link org.apache.gobblin.compaction.source.CompactionSource} */ +@Deprecated public class AzkabanCompactionJobLauncher extends AbstractJob { private static final Logger LOG = Logger.getLogger(AzkabanCompactionJobLauncher.class); @@ -51,6 +59,18 @@ public class AzkabanCompactionJobLauncher extends AbstractJob { super(jobId, LOG); this.properties = new Properties(); this.properties.putAll(props); + + // load dynamic configuration and add them to the job properties + Config propsAsConfig = ConfigUtils.propertiesToConfig(props); + DynamicConfigGenerator dynamicConfigGenerator = + DynamicConfigGeneratorFactory.createDynamicConfigGenerator(propsAsConfig); + Config dynamicConfig = dynamicConfigGenerator.generateDynamicConfig(propsAsConfig); + + // add the dynamic config to the job config + for (Map.Entry<String, ConfigValue> entry : dynamicConfig.entrySet()) { + this.properties.put(entry.getKey(), entry.getValue().unwrapped().toString()); + } + this.compactor = getCompactor(getCompactorFactory(), getCompactorListener(getCompactorListenerFactory())); }
