This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7deea30eb initiliaze yarn clients in yarn app launcher so that a child 
class can override the yarn client creation logic (#3679)
7deea30eb is described below

commit 7deea30eb5f758f9bbbd195cd376ad361e172dc6
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Mon Apr 17 13:39:20 2023 -0700

    initiliaze yarn clients in yarn app launcher so that a child class can 
override the yarn client creation logic (#3679)
---
 .../apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java |  4 +++-
 .../java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java  | 11 ++++++++---
 .../org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java   |  2 ++
 3 files changed, 13 insertions(+), 4 deletions(-)

diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
index 59e7a584a..8fe09e5a5 100644
--- 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
@@ -80,7 +80,9 @@ public class AzkabanGobblinYarnAppLauncher extends 
AbstractJob {
 
   protected GobblinYarnAppLauncher getYarnAppLauncher(Config gobblinConfig)
       throws IOException {
-    return new GobblinYarnAppLauncher(gobblinConfig, this.yarnConfiguration);
+    GobblinYarnAppLauncher gobblinYarnAppLauncher = new 
GobblinYarnAppLauncher(gobblinConfig, this.yarnConfiguration);
+    gobblinYarnAppLauncher.initializeYarnClients(gobblinConfig);
+    return gobblinYarnAppLauncher;
   }
 
   /**
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index e4acb8682..99c4094df 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -157,6 +157,10 @@ import static 
org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_
  *   this count exceeds the maximum number allowed, it will initiate a 
shutdown.
  * </p>
  *
+ * <p>
+ *   Users of {@link GobblinYarnAppLauncher} need to call {@link 
#initializeYarnClients} which a child class can override.
+ * </p>
+ *
  * @author Yinan Li
  */
 public class GobblinYarnAppLauncher {
@@ -260,7 +264,6 @@ public class GobblinYarnAppLauncher {
     YarnHelixUtils.setAdditionalYarnClassPath(config, this.yarnConfiguration);
     this.yarnConfiguration.set("fs.automatic.close", "false");
     this.originalYarnRMAddress = 
this.yarnConfiguration.get(GobblinYarnConfigurationKeys.YARN_RESOURCE_MANAGER_ADDRESS);
-    createPotentialYarnClients(config, this.potentialYarnClients);
 
     this.fs = GobblinClusterUtils.buildFileSystem(config, 
this.yarnConfiguration);
     this.closer.register(this.fs);
@@ -323,14 +326,14 @@ public class GobblinYarnAppLauncher {
     }
   }
 
-  protected void createPotentialYarnClients(Config config, Map<String, 
YarnClient> potentialYarnClients) {
+  public void initializeYarnClients(Config config) {
     Set<String> potentialRMAddresses = new 
HashSet<>(ConfigUtils.getStringList(config, 
GobblinYarnConfigurationKeys.OTHER_YARN_RESOURCE_MANAGER_ADDRESSES));
     potentialRMAddresses.add(originalYarnRMAddress);
     for (String rmAddress : potentialRMAddresses) {
       YarnClient tmpYarnClient = YarnClient.createYarnClient();
       
this.yarnConfiguration.set(GobblinYarnConfigurationKeys.YARN_RESOURCE_MANAGER_ADDRESS,
 rmAddress);
       tmpYarnClient.init(new YarnConfiguration(this.yarnConfiguration));
-      potentialYarnClients.put(rmAddress, tmpYarnClient);
+      this.potentialYarnClients.put(rmAddress, tmpYarnClient);
     }
   }
 
@@ -1069,6 +1072,8 @@ public class GobblinYarnAppLauncher {
   public static void main(String[] args) throws Exception {
     final GobblinYarnAppLauncher gobblinYarnAppLauncher =
         new GobblinYarnAppLauncher(ConfigFactory.load(), new 
YarnConfiguration());
+    gobblinYarnAppLauncher.initializeYarnClients(ConfigFactory.load());
+
     Runtime.getRuntime().addShutdownHook(new Thread() {
 
       @Override
diff --git 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
index 95c109cfe..2eb8c4f32 100644
--- 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
+++ 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
@@ -199,6 +199,7 @@ public class GobblinYarnAppLauncherTest implements 
HelixMessageTestBase {
         InstanceType.CONTROLLER, zkConnectionString);
 
     this.gobblinYarnAppLauncher = new GobblinYarnAppLauncher(this.config, 
clusterConf);
+    this.gobblinYarnAppLauncher.initializeYarnClients(this.config);
 
     this.configManagedHelix = ConfigFactory.parseURL(url)
         .withValue("gobblin.cluster.zk.connection.string",
@@ -213,6 +214,7 @@ public class GobblinYarnAppLauncherTest implements 
HelixMessageTestBase {
         InstanceType.PARTICIPANT, zkConnectionString);
 
     this.gobblinYarnAppLauncherManagedHelix = new 
GobblinYarnAppLauncher(this.configManagedHelix, clusterConf);
+    
this.gobblinYarnAppLauncherManagedHelix.initializeYarnClients(this.configManagedHelix);
   }
 
   @Test

Reply via email to