Repository: incubator-gobblin Updated Branches: refs/heads/master 7526439ca -> 5abfb797a
[GOBBLIN-602] Allow AzkabanProducer to be customized Closes #2468 from yukuai518/azkProducer Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/5abfb797 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/5abfb797 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/5abfb797 Branch: refs/heads/master Commit: 5abfb797a0ca219b62273ef872eaf74f1d50f30a Parents: 7526439 Author: Kuai Yu <[email protected]> Authored: Mon Oct 8 09:42:12 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Mon Oct 8 09:42:12 2018 -0700 ---------------------------------------------------------------------- .../modules/orchestration/AzkabanSpecExecutor.java | 14 +++++++++++++- .../orchestration/ServiceAzkabanConfigKeys.java | 1 + 2 files changed, 14 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5abfb797/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutor.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutor.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutor.java index 17470c7..ba78904 100644 --- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutor.java +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutor.java @@ -22,6 +22,9 @@ import org.apache.gobblin.runtime.api.Spec; import org.apache.gobblin.runtime.api.SpecProducer; import org.apache.gobblin.runtime.spec_executorInstance.AbstractSpecExecutor; import org.apache.gobblin.util.CompletedFuture; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.reflection.GobblinConstructorUtils; + import org.slf4j.Logger; import com.google.common.base.Optional; @@ -44,7 +47,16 @@ public class AzkabanSpecExecutor extends AbstractSpecExecutor { super(config, log); Config defaultConfig = ConfigFactory.load(ServiceAzkabanConfigKeys.DEFAULT_AZKABAN_PROJECT_CONFIG_FILE); _config = config.withFallback(defaultConfig); - azkabanSpecProducer = new AzkabanSpecProducer(_config, log); + + try { + Class<?> producerClass = Class.forName(ConfigUtils.getString(_config, + ServiceAzkabanConfigKeys.AZKABAN_PRODUCER_CLASS, + AzkabanSpecProducer.class.getName())); + azkabanSpecProducer = (SpecProducer<Spec>) GobblinConstructorUtils + .invokeLongestConstructor(producerClass, _config); + } catch (ReflectiveOperationException e) { + throw new RuntimeException("Could not instantiate kafka pusher", e); + } } @Override http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5abfb797/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java index 4c24944..3fdf257 100644 --- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java @@ -38,5 +38,6 @@ public class ServiceAzkabanConfigKeys { // Azkaban System Environment public static final String AZKABAN_PASSWORD_SYSTEM_KEY = "GOBBLIN_SERVICE_AZKABAN_PASSWORD"; public static final String DEFAULT_AZKABAN_PROJECT_CONFIG_FILE = "default-service-azkaban.conf"; + public static final String AZKABAN_PRODUCER_CLASS = GOBBLIN_SERVICE_AZKABAN_PREFIX + "producer.class"; }
