Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 7526439ca -> 5abfb797a


[GOBBLIN-602] Allow AzkabanProducer to be customized

Closes #2468 from yukuai518/azkProducer


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/5abfb797
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/5abfb797
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/5abfb797

Branch: refs/heads/master
Commit: 5abfb797a0ca219b62273ef872eaf74f1d50f30a
Parents: 7526439
Author: Kuai Yu <[email protected]>
Authored: Mon Oct 8 09:42:12 2018 -0700
Committer: Hung Tran <[email protected]>
Committed: Mon Oct 8 09:42:12 2018 -0700

----------------------------------------------------------------------
 .../modules/orchestration/AzkabanSpecExecutor.java    | 14 +++++++++++++-
 .../orchestration/ServiceAzkabanConfigKeys.java       |  1 +
 2 files changed, 14 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5abfb797/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutor.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutor.java
 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutor.java
index 17470c7..ba78904 100644
--- 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutor.java
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutor.java
@@ -22,6 +22,9 @@ import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.runtime.spec_executorInstance.AbstractSpecExecutor;
 import org.apache.gobblin.util.CompletedFuture;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
 import org.slf4j.Logger;
 
 import com.google.common.base.Optional;
@@ -44,7 +47,16 @@ public class AzkabanSpecExecutor extends 
AbstractSpecExecutor {
     super(config, log);
     Config defaultConfig = 
ConfigFactory.load(ServiceAzkabanConfigKeys.DEFAULT_AZKABAN_PROJECT_CONFIG_FILE);
     _config = config.withFallback(defaultConfig);
-    azkabanSpecProducer = new AzkabanSpecProducer(_config, log);
+
+    try {
+      Class<?> producerClass = Class.forName(ConfigUtils.getString(_config,
+          ServiceAzkabanConfigKeys.AZKABAN_PRODUCER_CLASS,
+          AzkabanSpecProducer.class.getName()));
+      azkabanSpecProducer = (SpecProducer<Spec>) GobblinConstructorUtils
+          .invokeLongestConstructor(producerClass, _config);
+    } catch (ReflectiveOperationException e) {
+      throw new RuntimeException("Could not instantiate kafka pusher", e);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5abfb797/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java
 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java
index 4c24944..3fdf257 100644
--- 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/ServiceAzkabanConfigKeys.java
@@ -38,5 +38,6 @@ public class ServiceAzkabanConfigKeys {
   // Azkaban System Environment
   public static final String AZKABAN_PASSWORD_SYSTEM_KEY = 
"GOBBLIN_SERVICE_AZKABAN_PASSWORD";
   public static final String DEFAULT_AZKABAN_PROJECT_CONFIG_FILE = 
"default-service-azkaban.conf";
+  public static final String AZKABAN_PRODUCER_CLASS = 
GOBBLIN_SERVICE_AZKABAN_PREFIX + "producer.class";
 }
 

Reply via email to