This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 3d14b86a5 [GOBBLIN-1841] Move disabling of current live instances to
the GobblinClusterManager startup (#3708)
3d14b86a5 is described below
commit 3d14b86a5bdb1b99d7fc13a5864a831ff9221b1c
Author: Peiying Ye <[email protected]>
AuthorDate: Fri Jun 30 14:41:04 2023 -0700
[GOBBLIN-1841] Move disabling of current live instances to the
GobblinClusterManager startup (#3708)
* [GOBBLIN-1841] Implement disableLiveHelixInstances and unit test
* [GOBBLIN-1841] clear commit history
* [GOBBLIN-1841] remove disableLiveHelixInstances from Yarn
* [GOBBLIN-1841] remove extra comments
* [GOBBLIN-1841] Implement TestGobblinClusterManager class
* [GOBBLIN-1841] Remove unnecessary imports
* [GOBBLIN-1841] Optimize imports
* [GOBBLIN-1841] Move disableTaskRunnersFromPreviousExecutions to
GobblinApplicationMaster
* [GOBBLIN-1841] Add edge cases check
* [GOBBLIN-1840] Fix checkstyle error
* [GOBBLIN-1841] Fix GobblinYarnAppLauncherTest testJobCleanup
* [GOBBLIN-1841] Add back javadoc to
disableTaskRunnersFromPreviousExecutions implementation
---
.../gobblin/cluster/GobblinClusterManager.java | 33 ++++++---
.../org/apache/gobblin/cluster/HelixUtils.java | 14 ++++
.../gobblin/yarn/GobblinApplicationMaster.java | 35 +++++++++
.../gobblin/yarn/GobblinYarnAppLauncher.java | 27 +------
.../gobblin/yarn/YarnAutoScalingManager.java | 15 +---
.../gobblin/yarn/GobblinApplicationMasterTest.java | 85 ++++++++++++++++++++++
.../gobblin/yarn/GobblinYarnAppLauncherTest.java | 25 +++++++
7 files changed, 185 insertions(+), 49 deletions(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index c7b74fe92..05ec790e3 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -279,15 +279,7 @@ public class GobblinClusterManager implements
ApplicationLauncher, StandardMetri
LOGGER.info("Starting the Gobblin Cluster Manager");
this.eventBus.register(this);
- this.multiManager.connect();
-
- // Standalone mode registers a handler to clean up on manager leadership
change, so only clean up for non-standalone
- // mode, such as YARN mode
- if (!this.isStandaloneMode) {
- this.multiManager.cleanUpJobs();
- }
-
- configureHelixQuotaBasedTaskScheduling();
+ setupHelix();
if (this.isStandaloneMode) {
// standalone mode starts non-daemon threads later, so need to have this
thread to keep process up
@@ -316,6 +308,18 @@ public class GobblinClusterManager implements
ApplicationLauncher, StandardMetri
this.started = true;
}
+ public synchronized void setupHelix() {
+ this.multiManager.connect();
+
+ // Standalone mode registers a handler to clean up on manager leadership
change, so only clean up for non-standalone
+ // mode, such as YARN mode
+ if (!this.isStandaloneMode) {
+ this.multiManager.cleanUpJobs();
+ }
+
+ configureHelixQuotaBasedTaskScheduling();
+ }
+
/**
* Stop the Gobblin Cluster Manager.
*/
@@ -427,11 +431,18 @@ public class GobblinClusterManager implements
ApplicationLauncher, StandardMetri
*/
@VisibleForTesting
void initializeHelixManager() {
- this.multiManager = new GobblinHelixMultiManager(
- this.config, aVoid ->
GobblinClusterManager.this.getUserDefinedMessageHandlerFactory(),
this.eventBus, stopStatus) ;
+ this.multiManager = createMultiManager();
this.multiManager.addLeadershipChangeAwareComponent(this);
}
+ /***
+ * Can be overriden to inject mock GobblinHelixMultiManager
+ * @return a new GobblinHelixMultiManager
+ */
+ public GobblinHelixMultiManager createMultiManager() {
+ return new GobblinHelixMultiManager(this.config, aVoid ->
GobblinClusterManager.this.getUserDefinedMessageHandlerFactory(),
this.eventBus, stopStatus);
+ }
+
@VisibleForTesting
void sendShutdownRequest() {
Criteria criteria = new Criteria();
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index 688c8c7ba..45c3685d1 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -28,11 +28,13 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyKey;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.model.HelixConfigScope;
@@ -452,6 +454,18 @@ public class HelixUtils {
return accessor.getChildNames(liveInstancesKey);
}
+ /**
+ * Getting all instances (Helix Participants) in cluster at this moment.
+ * Note that the raw result could contain AppMaster node and replanner node.
+ * @param filterString Helix instances whose name containing fitlerString
will pass filtering.
+ */
+ public static Set<String> getParticipants(HelixDataAccessor
helixDataAccessor, String filterString) {
+ PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
+ PropertyKey liveInstance = keyBuilder.liveInstances();
+ Map<String, HelixProperty> childValuesMap =
helixDataAccessor.getChildValuesMap(liveInstance);
+ return childValuesMap.keySet().stream().filter(x -> filterString.isEmpty()
|| x.contains(filterString)).collect(Collectors.toSet());
+ }
+
public static boolean isInstanceLive(HelixManager helixManager, String
instanceName) {
HelixDataAccessor accessor = helixManager.getHelixDataAccessor();
PropertyKey liveInstanceKey =
accessor.keyBuilder().liveInstance(instanceName);
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
index 16cb95480..ff2cb0e1a 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.yarn;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
@@ -32,6 +33,9 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
import org.apache.helix.messaging.handling.HelixTaskResult;
import org.apache.helix.messaging.handling.MessageHandler;
@@ -52,6 +56,8 @@ import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
import org.apache.gobblin.cluster.GobblinClusterManager;
import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.cluster.GobblinHelixMultiManager;
+import org.apache.gobblin.cluster.HelixUtils;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.JvmUtils;
import org.apache.gobblin.util.PathUtils;
@@ -135,6 +141,35 @@ public class GobblinApplicationMaster extends
GobblinClusterManager {
return new ControllerUserDefinedMessageHandlerFactory();
}
+ @Override
+ public synchronized void setupHelix() {
+ super.setupHelix();
+ this.disableTaskRunnersFromPreviousExecutions(this.multiManager);
+ }
+
+ /**
+ * A method to disable pre-existing live instances in a Helix cluster. This
can happen when a previous Yarn application
+ * leaves behind orphaned Yarn worker processes. Since Helix does not
provide an API to drop a live instance, we use
+ * the disable instance API to fence off these orphaned instances and
prevent them from becoming participants in the
+ * new cluster.
+ *
+ * NOTE: this is a workaround for an existing YARN bug. Once YARN has a fix
to guarantee container kills on application
+ * completion, this method should be removed.
+ */
+ public static void
disableTaskRunnersFromPreviousExecutions(GobblinHelixMultiManager multiManager)
{
+ HelixManager helixManager = multiManager.getJobClusterHelixManager();
+ HelixDataAccessor helixDataAccessor = helixManager.getHelixDataAccessor();
+ String clusterName = helixManager.getClusterName();
+ HelixAdmin helixAdmin = helixManager.getClusterManagmentTool();
+ Set<String> taskRunners = HelixUtils.getParticipants(helixDataAccessor,
+ GobblinYarnTaskRunner.HELIX_YARN_INSTANCE_NAME_PREFIX);
+ LOGGER.warn("Found {} task runners in the cluster.", taskRunners.size());
+ for (String taskRunner : taskRunners) {
+ LOGGER.warn("Disabling instance: {}", taskRunner);
+ helixAdmin.enableInstance(clusterName, taskRunner, false);
+ }
+ }
+
/**
* A custom {@link MultiTypeMessageHandlerFactory} for {@link
ControllerUserDefinedMessageHandler}s that
* handle messages of type {@link
org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}.
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 99c4094df..48ac89479 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -38,8 +38,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.avro.Schema;
import org.apache.commons.io.FileUtils;
import org.apache.commons.mail.EmailException;
-import org.apache.gobblin.util.hadoop.TokenUtils;
-import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -71,7 +69,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Records;
import org.apache.helix.Criteria;
-import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
@@ -117,8 +114,10 @@ import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.EmailUtils;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.JvmUtils;
+import org.apache.gobblin.util.hadoop.TokenUtils;
import org.apache.gobblin.util.io.StreamUtils;
import org.apache.gobblin.util.logs.LogCopier;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.gobblin.yarn.event.ApplicationReportArrivalEvent;
import org.apache.gobblin.yarn.event.GetApplicationReportFailureEvent;
@@ -371,7 +370,6 @@ public class GobblinYarnAppLauncher {
this.applicationId = getReconnectableApplicationId();
if (!this.applicationId.isPresent()) {
- disableLiveHelixInstances();
LOGGER.info("No reconnectable application found so submitting a new
application");
this.yarnClient = potentialYarnClients.get(this.originalYarnRMAddress);
this.applicationId = Optional.of(setupAndSubmitApplication());
@@ -454,7 +452,6 @@ public class GobblinYarnAppLauncher {
if (!this.detachOnExitEnabled) {
LOGGER.info("Disabling all live Helix instances..");
- disableLiveHelixInstances();
}
disconnectHelixManager();
@@ -540,26 +537,6 @@ public class GobblinYarnAppLauncher {
}
}
- /**
- * A method to disable pre-existing live instances in a Helix cluster. This
can happen when a previous Yarn application
- * leaves behind orphaned Yarn worker processes. Since Helix does not
provide an API to drop a live instance, we use
- * the disable instance API to fence off these orphaned instances and
prevent them from becoming participants in the
- * new cluster.
- *
- * NOTE: this is a workaround for an existing YARN bug. Once YARN has a fix
to guarantee container kills on application
- * completion, this method should be removed.
- */
- void disableLiveHelixInstances() {
- String clusterName = this.helixManager.getClusterName();
- HelixAdmin helixAdmin = this.helixManager.getClusterManagmentTool();
- List<String> liveInstances =
HelixUtils.getLiveInstances(this.helixManager);
- LOGGER.warn("Found {} live instances in the cluster.",
liveInstances.size());
- for (String instanceName: liveInstances) {
- LOGGER.warn("Disabling instance: {}", instanceName);
- helixAdmin.enableInstance(clusterName, instanceName, false);
- }
- }
-
@VisibleForTesting
void disconnectHelixManager() {
if (this.helixManager.isConnected()) {
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
index a2f4d8372..c447af99e 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
@@ -34,7 +34,6 @@ import org.apache.commons.compress.utils.Sets;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
-import org.apache.helix.PropertyKey;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.JobDag;
@@ -56,6 +55,7 @@ import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.HelixUtils;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;
@@ -184,17 +184,6 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
}
}
- /**
- * Getting all instances (Helix Participants) in cluster at this moment.
- * Note that the raw result could contains AppMaster node and replanner
node.
- * @param filterString Helix instances whose name containing fitlerString
will pass filtering.
- */
- private Set<String> getParticipants(String filterString) {
- PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
- return helixDataAccessor.getChildValuesMap(keyBuilder.liveInstances())
- .keySet().stream().filter(x -> filterString.isEmpty() ||
x.contains(filterString)).collect(Collectors.toSet());
- }
-
private String getInuseParticipantForHelixPartition(JobContext jobContext,
int partition) {
if (jobContext.getPartitionNumAttempts(partition) >
THRESHOLD_NUMBER_OF_ATTEMPTS_FOR_LOGGING) {
log.warn("Helix task {} has been retried for {} times, please check
the config to see how we can handle this task better",
@@ -275,7 +264,7 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
}
// Find all participants appearing in this cluster. Note that Helix
instances can contain cluster-manager
// and potentially replanner-instance.
- Set<String> allParticipants =
getParticipants(HELIX_YARN_INSTANCE_NAME_PREFIX);
+ Set<String> allParticipants =
HelixUtils.getParticipants(helixDataAccessor, HELIX_YARN_INSTANCE_NAME_PREFIX);
// Find all joined participants not in-use for this round of inspection.
// If idle time is beyond tolerance, mark the instance as unused by
assigning timestamp as -1.
diff --git
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinApplicationMasterTest.java
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinApplicationMasterTest.java
new file mode 100644
index 000000000..947021b04
--- /dev/null
+++
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinApplicationMasterTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+import junit.framework.TestCase;
+
+import org.apache.gobblin.cluster.GobblinHelixMultiManager;
+
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.when;
+
+
+public class GobblinApplicationMasterTest extends TestCase {
+ @Test
+ public void testDisableTaskRunnersFromPreviousExecutions() {
+ GobblinHelixMultiManager mockMultiManager =
Mockito.mock(GobblinHelixMultiManager.class);
+
+ HelixManager mockHelixManager = Mockito.mock(HelixManager.class);
+
when(mockMultiManager.getJobClusterHelixManager()).thenReturn(mockHelixManager);
+
+ HelixAdmin mockHelixAdmin = Mockito.mock(HelixAdmin.class);
+
when(mockHelixManager.getClusterManagmentTool()).thenReturn(mockHelixAdmin);
+ when(mockHelixManager.getClusterName()).thenReturn("mockCluster");
+
+ HelixDataAccessor mockAccessor = Mockito.mock(HelixDataAccessor.class);
+ when(mockHelixManager.getHelixDataAccessor()).thenReturn(mockAccessor);
+
+ PropertyKey.Builder mockBuilder = Mockito.mock(PropertyKey.Builder.class);
+ when(mockAccessor.keyBuilder()).thenReturn(mockBuilder);
+
+ PropertyKey mockLiveInstancesKey = Mockito.mock(PropertyKey.class);
+ when(mockBuilder.liveInstances()).thenReturn(mockLiveInstancesKey);
+
+ int instanceCount = 3;
+
+ // GobblinYarnTaskRunner prefix would be disabled, while
GobblinClusterManager prefix will not
+ ArrayList<String> gobblinYarnTaskRunnerPrefix = new ArrayList<String>();
+ ArrayList<String> gobblinClusterManagerPrefix = new ArrayList<String>();
+ for (int i = 0; i < instanceCount; i++) {
+ gobblinYarnTaskRunnerPrefix.add("GobblinYarnTaskRunner_TestInstance_" +
i);
+ gobblinClusterManagerPrefix.add("GobblinClusterManager_TestInstance_" +
i);
+ }
+
+ Map<String, HelixProperty> mockChildValues = new HashMap<>();
+ for (int i = 0; i < instanceCount; i++) {
+ mockChildValues.put(gobblinYarnTaskRunnerPrefix.get(i),
Mockito.mock(HelixProperty.class));
+ mockChildValues.put(gobblinClusterManagerPrefix.get(i),
Mockito.mock(HelixProperty.class));
+ }
+
when(mockAccessor.getChildValuesMap(mockLiveInstancesKey)).thenReturn(mockChildValues);
+
+
GobblinApplicationMaster.disableTaskRunnersFromPreviousExecutions(mockMultiManager);
+
+ for (int i = 0; i < instanceCount; i++) {
+ Mockito.verify(mockHelixAdmin).enableInstance("mockCluster",
gobblinYarnTaskRunnerPrefix.get(i), false);
+ Mockito.verify(mockHelixAdmin, times(0)).enableInstance("mockCluster",
gobblinClusterManagerPrefix.get(i), false);
+ }
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
index 2eb8c4f32..65e9dc269 100644
---
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
+++
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
@@ -27,6 +27,7 @@ import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -44,9 +45,13 @@ import
org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.HelixProperty;
import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
import org.apache.helix.model.Message;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
@@ -84,6 +89,7 @@ import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
import org.apache.gobblin.testing.AssertWithBackoff;
import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.when;
/**
@@ -431,6 +437,25 @@ public class GobblinYarnAppLauncherTest implements
HelixMessageTestBase {
GobblinHelixMultiManager mockMultiManager =
Mockito.mock(GobblinHelixMultiManager.class);
appMaster.setMultiManager(mockMultiManager);
+
+ HelixManager mockHelixManager = Mockito.mock(HelixManager.class);
+
when(mockMultiManager.getJobClusterHelixManager()).thenReturn(mockHelixManager);
+
+ HelixAdmin mockHelixAdmin = Mockito.mock(HelixAdmin.class);
+
when(mockHelixManager.getClusterManagmentTool()).thenReturn(mockHelixAdmin);
+
+ HelixDataAccessor mockAccessor = Mockito.mock(HelixDataAccessor.class);
+ when(mockHelixManager.getHelixDataAccessor()).thenReturn(mockAccessor);
+
+ PropertyKey.Builder mockBuilder = Mockito.mock(PropertyKey.Builder.class);
+ when(mockAccessor.keyBuilder()).thenReturn(mockBuilder);
+
+ PropertyKey mockLiveInstancesKey = Mockito.mock(PropertyKey.class);
+ when(mockBuilder.liveInstances()).thenReturn(mockLiveInstancesKey);
+
+ Map<String, HelixProperty> mockChildValues = new HashMap<>();
+
when(mockAccessor.getChildValuesMap(mockLiveInstancesKey)).thenReturn(mockChildValues);
+
appMaster.start();
Mockito.verify(mockMultiManager, times(1)).cleanUpJobs();