This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d14b86a5 [GOBBLIN-1841] Move disabling of current live instances to 
the GobblinClusterManager startup (#3708)
3d14b86a5 is described below

commit 3d14b86a5bdb1b99d7fc13a5864a831ff9221b1c
Author: Peiying Ye <[email protected]>
AuthorDate: Fri Jun 30 14:41:04 2023 -0700

    [GOBBLIN-1841] Move disabling of current live instances to the 
GobblinClusterManager startup (#3708)
    
    * [GOBBLIN-1841] Implement disableLiveHelixInstances and unit test
    
    * [GOBBLIN-1841] clear commit history
    
    * [GOBBLIN-1841] remove disableLiveHelixInstances from Yarn
    
    * [GOBBLIN-1841] remove extra comments
    
    * [GOBBLIN-1841] Implement TestGobblinClusterManager class
    
    * [GOBBLIN-1841] Remove unnecessary imports
    
    * [GOBBLIN-1841] Optimize imports
    
    * [GOBBLIN-1841] Move disableTaskRunnersFromPreviousExecutions to 
GobblinApplicationMaster
    
    * [GOBBLIN-1841] Add edge cases check
    
    * [GOBBLIN-1840] Fix checkstyle error
    
    * [GOBBLIN-1841] Fix GobblinYarnAppLauncherTest testJobCleanup
    
    * [GOBBLIN-1841] Add back javadoc to 
disableTaskRunnersFromPreviousExecutions implementation
---
 .../gobblin/cluster/GobblinClusterManager.java     | 33 ++++++---
 .../org/apache/gobblin/cluster/HelixUtils.java     | 14 ++++
 .../gobblin/yarn/GobblinApplicationMaster.java     | 35 +++++++++
 .../gobblin/yarn/GobblinYarnAppLauncher.java       | 27 +------
 .../gobblin/yarn/YarnAutoScalingManager.java       | 15 +---
 .../gobblin/yarn/GobblinApplicationMasterTest.java | 85 ++++++++++++++++++++++
 .../gobblin/yarn/GobblinYarnAppLauncherTest.java   | 25 +++++++
 7 files changed, 185 insertions(+), 49 deletions(-)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index c7b74fe92..05ec790e3 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -279,15 +279,7 @@ public class GobblinClusterManager implements 
ApplicationLauncher, StandardMetri
     LOGGER.info("Starting the Gobblin Cluster Manager");
 
     this.eventBus.register(this);
-    this.multiManager.connect();
-
-    // Standalone mode registers a handler to clean up on manager leadership 
change, so only clean up for non-standalone
-    // mode, such as YARN mode
-    if (!this.isStandaloneMode) {
-      this.multiManager.cleanUpJobs();
-    }
-
-    configureHelixQuotaBasedTaskScheduling();
+    setupHelix();
 
     if (this.isStandaloneMode) {
       // standalone mode starts non-daemon threads later, so need to have this 
thread to keep process up
@@ -316,6 +308,18 @@ public class GobblinClusterManager implements 
ApplicationLauncher, StandardMetri
     this.started = true;
   }
 
+  public synchronized void setupHelix() {
+    this.multiManager.connect();
+
+    // Standalone mode registers a handler to clean up on manager leadership 
change, so only clean up for non-standalone
+    // mode, such as YARN mode
+    if (!this.isStandaloneMode) {
+      this.multiManager.cleanUpJobs();
+    }
+
+    configureHelixQuotaBasedTaskScheduling();
+  }
+
   /**
    * Stop the Gobblin Cluster Manager.
    */
@@ -427,11 +431,18 @@ public class GobblinClusterManager implements 
ApplicationLauncher, StandardMetri
    */
   @VisibleForTesting
   void initializeHelixManager() {
-    this.multiManager = new GobblinHelixMultiManager(
-        this.config, aVoid -> 
GobblinClusterManager.this.getUserDefinedMessageHandlerFactory(), 
this.eventBus, stopStatus) ;
+    this.multiManager = createMultiManager();
     this.multiManager.addLeadershipChangeAwareComponent(this);
   }
 
+  /***
+   * Can be overriden to inject mock GobblinHelixMultiManager
+   * @return a new GobblinHelixMultiManager
+   */
+  public GobblinHelixMultiManager createMultiManager() {
+    return new GobblinHelixMultiManager(this.config, aVoid -> 
GobblinClusterManager.this.getUserDefinedMessageHandlerFactory(), 
this.eventBus, stopStatus);
+  }
+
   @VisibleForTesting
   void sendShutdownRequest() {
     Criteria criteria = new Criteria();
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index 688c8c7ba..45c3685d1 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -28,11 +28,13 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
 
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.model.HelixConfigScope;
@@ -452,6 +454,18 @@ public class HelixUtils {
     return accessor.getChildNames(liveInstancesKey);
   }
 
+  /**
+   * Getting all instances (Helix Participants) in cluster at this moment.
+   * Note that the raw result could contain AppMaster node and replanner node.
+   * @param filterString Helix instances whose name containing fitlerString 
will pass filtering.
+   */
+  public static Set<String> getParticipants(HelixDataAccessor 
helixDataAccessor, String filterString) {
+    PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
+    PropertyKey liveInstance = keyBuilder.liveInstances();
+    Map<String, HelixProperty> childValuesMap = 
helixDataAccessor.getChildValuesMap(liveInstance);
+    return childValuesMap.keySet().stream().filter(x -> filterString.isEmpty() 
|| x.contains(filterString)).collect(Collectors.toSet());
+  }
+
   public static boolean isInstanceLive(HelixManager helixManager, String 
instanceName) {
     HelixDataAccessor accessor = helixManager.getHelixDataAccessor();
     PropertyKey liveInstanceKey = 
accessor.keyBuilder().liveInstance(instanceName);
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
index 16cb95480..ff2cb0e1a 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.yarn;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.DefaultParser;
@@ -32,6 +33,9 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.messaging.handling.HelixTaskResult;
 import org.apache.helix.messaging.handling.MessageHandler;
@@ -52,6 +56,8 @@ import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
 import org.apache.gobblin.cluster.GobblinClusterManager;
 import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.cluster.GobblinHelixMultiManager;
+import org.apache.gobblin.cluster.HelixUtils;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.JvmUtils;
 import org.apache.gobblin.util.PathUtils;
@@ -135,6 +141,35 @@ public class GobblinApplicationMaster extends 
GobblinClusterManager {
     return new ControllerUserDefinedMessageHandlerFactory();
   }
 
+  @Override
+  public synchronized void setupHelix() {
+    super.setupHelix();
+    this.disableTaskRunnersFromPreviousExecutions(this.multiManager);
+  }
+
+  /**
+   * A method to disable pre-existing live instances in a Helix cluster. This 
can happen when a previous Yarn application
+   * leaves behind orphaned Yarn worker processes. Since Helix does not 
provide an API to drop a live instance, we use
+   * the disable instance API to fence off these orphaned instances and 
prevent them from becoming participants in the
+   * new cluster.
+   *
+   * NOTE: this is a workaround for an existing YARN bug. Once YARN has a fix 
to guarantee container kills on application
+   * completion, this method should be removed.
+   */
+  public static void 
disableTaskRunnersFromPreviousExecutions(GobblinHelixMultiManager multiManager) 
{
+    HelixManager helixManager = multiManager.getJobClusterHelixManager();
+    HelixDataAccessor helixDataAccessor = helixManager.getHelixDataAccessor();
+    String clusterName = helixManager.getClusterName();
+    HelixAdmin helixAdmin = helixManager.getClusterManagmentTool();
+    Set<String> taskRunners = HelixUtils.getParticipants(helixDataAccessor,
+        GobblinYarnTaskRunner.HELIX_YARN_INSTANCE_NAME_PREFIX);
+    LOGGER.warn("Found {} task runners in the cluster.", taskRunners.size());
+    for (String taskRunner : taskRunners) {
+      LOGGER.warn("Disabling instance: {}", taskRunner);
+      helixAdmin.enableInstance(clusterName, taskRunner, false);
+    }
+  }
+
   /**
    * A custom {@link MultiTypeMessageHandlerFactory} for {@link 
ControllerUserDefinedMessageHandler}s that
    * handle messages of type {@link 
org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}.
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 99c4094df..48ac89479 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -38,8 +38,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.avro.Schema;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.mail.EmailException;
-import org.apache.gobblin.util.hadoop.TokenUtils;
-import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -71,7 +69,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.helix.Criteria;
-import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
@@ -117,8 +114,10 @@ import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.EmailUtils;
 import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.util.JvmUtils;
+import org.apache.gobblin.util.hadoop.TokenUtils;
 import org.apache.gobblin.util.io.StreamUtils;
 import org.apache.gobblin.util.logs.LogCopier;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 import org.apache.gobblin.yarn.event.ApplicationReportArrivalEvent;
 import org.apache.gobblin.yarn.event.GetApplicationReportFailureEvent;
 
@@ -371,7 +370,6 @@ public class GobblinYarnAppLauncher {
     this.applicationId = getReconnectableApplicationId();
 
     if (!this.applicationId.isPresent()) {
-      disableLiveHelixInstances();
       LOGGER.info("No reconnectable application found so submitting a new 
application");
       this.yarnClient = potentialYarnClients.get(this.originalYarnRMAddress);
       this.applicationId = Optional.of(setupAndSubmitApplication());
@@ -454,7 +452,6 @@ public class GobblinYarnAppLauncher {
 
       if (!this.detachOnExitEnabled) {
         LOGGER.info("Disabling all live Helix instances..");
-        disableLiveHelixInstances();
       }
 
       disconnectHelixManager();
@@ -540,26 +537,6 @@ public class GobblinYarnAppLauncher {
     }
   }
 
-  /**
-   * A method to disable pre-existing live instances in a Helix cluster. This 
can happen when a previous Yarn application
-   * leaves behind orphaned Yarn worker processes. Since Helix does not 
provide an API to drop a live instance, we use
-   * the disable instance API to fence off these orphaned instances and 
prevent them from becoming participants in the
-   * new cluster.
-   *
-   * NOTE: this is a workaround for an existing YARN bug. Once YARN has a fix 
to guarantee container kills on application
-   * completion, this method should be removed.
-   */
-  void disableLiveHelixInstances() {
-    String clusterName = this.helixManager.getClusterName();
-    HelixAdmin helixAdmin = this.helixManager.getClusterManagmentTool();
-    List<String> liveInstances = 
HelixUtils.getLiveInstances(this.helixManager);
-    LOGGER.warn("Found {} live instances in the cluster.", 
liveInstances.size());
-    for (String instanceName: liveInstances) {
-      LOGGER.warn("Disabling instance: {}", instanceName);
-      helixAdmin.enableInstance(clusterName, instanceName, false);
-    }
-  }
-
   @VisibleForTesting
   void disconnectHelixManager() {
     if (this.helixManager.isConnected()) {
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
index a2f4d8372..c447af99e 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
@@ -34,7 +34,6 @@ import org.apache.commons.compress.utils.Sets;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
-import org.apache.helix.PropertyKey;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobDag;
@@ -56,6 +55,7 @@ import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.HelixUtils;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.ExecutorsUtils;
 
@@ -184,17 +184,6 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
       }
     }
 
-    /**
-     * Getting all instances (Helix Participants) in cluster at this moment.
-     * Note that the raw result could contains AppMaster node and replanner 
node.
-     * @param filterString Helix instances whose name containing fitlerString 
will pass filtering.
-     */
-    private Set<String> getParticipants(String filterString) {
-      PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
-      return helixDataAccessor.getChildValuesMap(keyBuilder.liveInstances())
-          .keySet().stream().filter(x -> filterString.isEmpty() || 
x.contains(filterString)).collect(Collectors.toSet());
-    }
-
     private String getInuseParticipantForHelixPartition(JobContext jobContext, 
int partition) {
       if (jobContext.getPartitionNumAttempts(partition) > 
THRESHOLD_NUMBER_OF_ATTEMPTS_FOR_LOGGING) {
         log.warn("Helix task {} has been retried for {} times, please check 
the config to see how we can handle this task better",
@@ -275,7 +264,7 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
       }
       // Find all participants appearing in this cluster. Note that Helix 
instances can contain cluster-manager
       // and potentially replanner-instance.
-      Set<String> allParticipants = 
getParticipants(HELIX_YARN_INSTANCE_NAME_PREFIX);
+      Set<String> allParticipants = 
HelixUtils.getParticipants(helixDataAccessor, HELIX_YARN_INSTANCE_NAME_PREFIX);
 
       // Find all joined participants not in-use for this round of inspection.
       // If idle time is beyond tolerance, mark the instance as unused by 
assigning timestamp as -1.
diff --git 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinApplicationMasterTest.java
 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinApplicationMasterTest.java
new file mode 100644
index 000000000..947021b04
--- /dev/null
+++ 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinApplicationMasterTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+import junit.framework.TestCase;
+
+import org.apache.gobblin.cluster.GobblinHelixMultiManager;
+
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.when;
+
+
+public class GobblinApplicationMasterTest extends TestCase {
+  @Test
+  public void testDisableTaskRunnersFromPreviousExecutions() {
+    GobblinHelixMultiManager mockMultiManager = 
Mockito.mock(GobblinHelixMultiManager.class);
+
+    HelixManager mockHelixManager = Mockito.mock(HelixManager.class);
+    
when(mockMultiManager.getJobClusterHelixManager()).thenReturn(mockHelixManager);
+
+    HelixAdmin mockHelixAdmin = Mockito.mock(HelixAdmin.class);
+    
when(mockHelixManager.getClusterManagmentTool()).thenReturn(mockHelixAdmin);
+    when(mockHelixManager.getClusterName()).thenReturn("mockCluster");
+
+    HelixDataAccessor mockAccessor = Mockito.mock(HelixDataAccessor.class);
+    when(mockHelixManager.getHelixDataAccessor()).thenReturn(mockAccessor);
+
+    PropertyKey.Builder mockBuilder = Mockito.mock(PropertyKey.Builder.class);
+    when(mockAccessor.keyBuilder()).thenReturn(mockBuilder);
+
+    PropertyKey mockLiveInstancesKey = Mockito.mock(PropertyKey.class);
+    when(mockBuilder.liveInstances()).thenReturn(mockLiveInstancesKey);
+
+    int instanceCount = 3;
+
+    // GobblinYarnTaskRunner prefix would be disabled, while 
GobblinClusterManager prefix will not
+    ArrayList<String> gobblinYarnTaskRunnerPrefix = new ArrayList<String>();
+    ArrayList<String> gobblinClusterManagerPrefix = new ArrayList<String>();
+    for (int i = 0; i < instanceCount; i++) {
+      gobblinYarnTaskRunnerPrefix.add("GobblinYarnTaskRunner_TestInstance_" + 
i);
+      gobblinClusterManagerPrefix.add("GobblinClusterManager_TestInstance_" + 
i);
+    }
+
+    Map<String, HelixProperty> mockChildValues = new HashMap<>();
+    for (int i = 0; i < instanceCount; i++) {
+      mockChildValues.put(gobblinYarnTaskRunnerPrefix.get(i), 
Mockito.mock(HelixProperty.class));
+      mockChildValues.put(gobblinClusterManagerPrefix.get(i), 
Mockito.mock(HelixProperty.class));
+    }
+    
when(mockAccessor.getChildValuesMap(mockLiveInstancesKey)).thenReturn(mockChildValues);
+
+    
GobblinApplicationMaster.disableTaskRunnersFromPreviousExecutions(mockMultiManager);
+
+    for (int i = 0; i < instanceCount; i++) {
+      Mockito.verify(mockHelixAdmin).enableInstance("mockCluster", 
gobblinYarnTaskRunnerPrefix.get(i), false);
+      Mockito.verify(mockHelixAdmin, times(0)).enableInstance("mockCluster", 
gobblinClusterManagerPrefix.get(i), false);
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
index 2eb8c4f32..65e9dc269 100644
--- 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
+++ 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
@@ -27,6 +27,7 @@ import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -44,9 +45,13 @@ import 
org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.HelixProperty;
 import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.model.Message;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
@@ -84,6 +89,7 @@ import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
 import org.apache.gobblin.testing.AssertWithBackoff;
 
 import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.when;
 
 
 /**
@@ -431,6 +437,25 @@ public class GobblinYarnAppLauncherTest implements 
HelixMessageTestBase {
     GobblinHelixMultiManager mockMultiManager = 
Mockito.mock(GobblinHelixMultiManager.class);
 
     appMaster.setMultiManager(mockMultiManager);
+
+    HelixManager mockHelixManager = Mockito.mock(HelixManager.class);
+    
when(mockMultiManager.getJobClusterHelixManager()).thenReturn(mockHelixManager);
+
+    HelixAdmin mockHelixAdmin = Mockito.mock(HelixAdmin.class);
+    
when(mockHelixManager.getClusterManagmentTool()).thenReturn(mockHelixAdmin);
+
+    HelixDataAccessor mockAccessor = Mockito.mock(HelixDataAccessor.class);
+    when(mockHelixManager.getHelixDataAccessor()).thenReturn(mockAccessor);
+
+    PropertyKey.Builder mockBuilder = Mockito.mock(PropertyKey.Builder.class);
+    when(mockAccessor.keyBuilder()).thenReturn(mockBuilder);
+
+    PropertyKey mockLiveInstancesKey = Mockito.mock(PropertyKey.class);
+    when(mockBuilder.liveInstances()).thenReturn(mockLiveInstancesKey);
+
+    Map<String, HelixProperty> mockChildValues = new HashMap<>();
+    
when(mockAccessor.getChildValuesMap(mockLiveInstancesKey)).thenReturn(mockChildValues);
+
     appMaster.start();
 
     Mockito.verify(mockMultiManager, times(1)).cleanUpJobs();

Reply via email to