This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 3733d6028 [GOBBLIN-1704] Purge offline helix instances during startup 
(#3561)
3733d6028 is described below

commit 3733d6028c437e18eff349ba56d8264a56d4673f
Author: Matthew Ho <[email protected]>
AuthorDate: Thu Sep 22 17:13:54 2022 -0700

    [GOBBLIN-1704] Purge offline helix instances during startup (#3561)
    
    * [GOBBLIN-1704] Purge offline helix instances during startup
    
    To avoid accumulation of helix instances, we should clean up
    over time. But we can only perform the clean up at startup
    because it's unsafe to call the API while instances are being
    created / removed.
    
    * Emit Gobblin Tracking Event during failure / completion
---
 .../gobblin/cluster/GobblinHelixMultiManager.java  |  17 +++
 .../gobblin/metrics/event/GobblinEventBuilder.java |   2 +
 gobblin-yarn/build.gradle                          |   1 +
 .../gobblin/yarn/GobblinApplicationMaster.java     |   2 +-
 .../gobblin/yarn/GobblinYarnConfigurationKeys.java |  13 ++
 .../yarn/HelixInstancePurgerWithMetrics.java       | 140 +++++++++++++++++++++
 .../java/org/apache/gobblin/yarn/YarnService.java  |  36 +++++-
 .../gobblin/yarn/GobblinYarnAppLauncherTest.java   |   2 +-
 .../yarn/HelixInstancePurgerWithMetricsTest.java   | 134 ++++++++++++++++++++
 .../org/apache/gobblin/yarn/YarnServiceTest.java   |  14 ++-
 10 files changed, 353 insertions(+), 8 deletions(-)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
index 83a251e27..d9bb3416e 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
@@ -29,12 +29,14 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.api.listeners.ControllerChangeListener;
 import org.apache.helix.api.listeners.LiveInstanceChangeListener;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.messaging.handling.HelixTaskResult;
 import org.apache.helix.messaging.handling.MessageHandler;
 import org.apache.helix.messaging.handling.MultiTypeMessageHandlerFactory;
@@ -86,6 +88,9 @@ public class GobblinHelixMultiManager implements 
StandardMetricsBridge {
   @Getter
   private HelixManager jobClusterHelixManager = null;
 
+  @Getter
+  private HelixAdmin jobClusterHelixAdmin = null;
+
   /**
    * Helix manager to handle planning job distribution.
    * Corresponds to cluster with key name {@link 
GobblinClusterConfigurationKeys#HELIX_CLUSTER_NAME_KEY}.
@@ -170,6 +175,16 @@ public class GobblinHelixMultiManager implements 
StandardMetricsBridge {
         config.getString(clusterName), helixInstanceName, type, 
zkConnectionString);
   }
 
+  /**
+   * Build the {@link org.apache.helix.HelixAdmin} for the AM
+   */
+  protected static HelixAdmin buildHelixAdmin(Config cfg) {
+    String zkConnectionString = 
cfg.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
+    return new ZKHelixAdmin.Builder()
+        .setZkAddress(zkConnectionString)
+        .build();
+  }
+
   public void initialize() {
     if (this.dedicatedManagerCluster) {
       
Preconditions.checkArgument(this.config.hasPath(GobblinClusterConfigurationKeys.MANAGER_CLUSTER_NAME_KEY));
@@ -183,6 +198,7 @@ public class GobblinHelixMultiManager implements 
StandardMetricsBridge {
       this.jobClusterHelixManager = buildHelixManager(this.config,
           GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY,
           InstanceType.ADMINISTRATOR);
+      this.jobClusterHelixAdmin = buildHelixAdmin(this.config);
 
       // This will create a dedicated controller for job distribution
       this.dedicatedJobClusterController = ConfigUtils.getBoolean(
@@ -225,6 +241,7 @@ public class GobblinHelixMultiManager implements 
StandardMetricsBridge {
           GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY,
           isHelixClusterManaged ? InstanceType.PARTICIPANT : 
InstanceType.CONTROLLER);
       this.jobClusterHelixManager = this.managerClusterHelixManager;
+      this.jobClusterHelixAdmin = buildHelixAdmin(this.config);
     }
   }
 
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java
index 6b4ae9470..a3c0a402d 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java
@@ -24,6 +24,7 @@ import com.google.common.collect.Maps;
 
 import lombok.Getter;
 import lombok.Setter;
+import lombok.ToString;
 
 import org.apache.gobblin.metrics.GobblinTrackingEvent;
 import org.apache.gobblin.metrics.MetricContext;
@@ -37,6 +38,7 @@ import org.apache.gobblin.metrics.MetricContext;
  *
  * Note: a {@link GobblinEventBuilder} instance is not reusable
  */
+@ToString
 public class GobblinEventBuilder {
   public static final String NAMESPACE = "gobblin.event";
   public static final String EVENT_TYPE = "eventType";
diff --git a/gobblin-yarn/build.gradle b/gobblin-yarn/build.gradle
index 1c01e5585..820350213 100644
--- a/gobblin-yarn/build.gradle
+++ b/gobblin-yarn/build.gradle
@@ -67,6 +67,7 @@ dependencies {
   testCompile externalDependency.hadoopYarnMiniCluster
   testCompile externalDependency.curatorFramework
   testCompile externalDependency.curatorTest
+  testCompile externalDependency.powermock
 
   testCompile ('com.google.inject:guice:3.0') {
     force = true
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
index e64bb2118..16cb95480 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
@@ -120,7 +120,7 @@ public class GobblinApplicationMaster extends 
GobblinClusterManager {
       YarnConfiguration yarnConfiguration, FileSystem fs)
       throws Exception {
     return new YarnService(config, applicationName, applicationId, 
yarnConfiguration, fs, this.eventBus,
-        this.getMultiManager().getJobClusterHelixManager());
+        this.multiManager.getJobClusterHelixManager(), 
this.multiManager.getJobClusterHelixAdmin());
   }
 
   /**
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
index 6e374d320..9799395df 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
@@ -17,6 +17,9 @@
 
 package org.apache.gobblin.yarn;
 
+import java.time.Duration;
+
+
 /**
  * A central place for configuration related constants of Gobblin on Yarn.
  *
@@ -84,6 +87,16 @@ public class GobblinYarnConfigurationKeys {
   // Helix configuration properties.
   public static final String HELIX_INSTANCE_MAX_RETRIES = GOBBLIN_YARN_PREFIX 
+ "helix.instance.max.retries";
 
+  public static final String HELIX_PURGE_PREFIX = GOBBLIN_YARN_PREFIX + 
"helix.purgeOfflineHelixInstances.";
+  public static final String HELIX_PURGE_OFFLINE_INSTANCES_ENABLED = 
HELIX_PURGE_PREFIX + "enabled";
+  public static final boolean DEFAULT_HELIX_PURGE_OFFLINE_INSTANCES_ENABLED = 
true;
+
+  public static final String HELIX_PURGE_LAGGING_THRESHOLD_MILLIS = 
HELIX_PURGE_PREFIX + "laggingThresholdMs";
+  public static final long DEFAULT_HELIX_PURGE_LAGGING_THRESHOLD_MILLIS = 
Duration.ofMinutes(1).toMillis();
+
+  public static final String HELIX_PURGE_POLLING_RATE_MILLIS = 
HELIX_PURGE_PREFIX + "pollingRateMs";
+  public static final long DEFAULT_HELIX_PURGE_POLLING_RATE_MILLIS = 
Duration.ofSeconds(5).toMillis();
+
   // Security and authentication configuration properties.
   public static final String SECURITY_MANAGER_CLASS = GOBBLIN_YARN_PREFIX + 
"security.manager.class";
   public static final String DEFAULT_SECURITY_MANAGER_CLASS = 
"org.apache.gobblin.yarn.YarnAppSecurityManagerWithKeytabs";
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/HelixInstancePurgerWithMetrics.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/HelixInstancePurgerWithMetrics.java
new file mode 100644
index 000000000..efb6644b4
--- /dev/null
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/HelixInstancePurgerWithMetrics.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Stopwatch;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+import org.apache.helix.HelixAdmin;
+
+
+@Slf4j
+@AllArgsConstructor
+public class HelixInstancePurgerWithMetrics {
+  private final EventSubmitter eventSubmitter;
+  private final long pollingRateMs;
+  private static final String PREFIX = "HelixOfflineInstancePurge.";
+  public static final String PURGE_FAILURE_EVENT =  PREFIX + "Failure";
+  public static final String PURGE_LAGGING_EVENT = PREFIX + "Lagging";
+  public static final String PURGE_COMPLETED_EVENT = PREFIX + "Completed";
+
+
+  /**
+   * Blocking call for purging all offline helix instances. Provides boiler 
plate code for providing periodic updates
+   * and sending a GTE if it's an unexpected amount of time.
+   *
+   * All previous helix instances should be purged on startup. Gobblin task 
runners are stateless from helix
+   * perspective because all important state is persisted separately in 
Workunit State Store or Watermark store.
+   */
+  public void purgeAllOfflineInstances(HelixAdmin admin, String clusterName, 
long laggingThresholdMs, Map<String, String> gteMetadata) {
+    CompletableFuture<Void> purgeTask = CompletableFuture.supplyAsync(() -> {
+      long offlineDuration = 0; // 0 means all offline instance should be 
purged.
+      admin.purgeOfflineInstances(clusterName, offlineDuration);
+      return null;
+    });
+
+    long timeToPurgeMs = waitForPurgeCompletion(purgeTask, laggingThresholdMs, 
Stopwatch.createUnstarted(), gteMetadata);
+    log.info("Finished purging offline helix instances. It took 
timeToPurgeMs={}", timeToPurgeMs);
+  }
+
+  @VisibleForTesting
+  long waitForPurgeCompletion(CompletableFuture<Void> purgeTask, long 
laggingThresholdMs, Stopwatch watch,
+      Map<String, String> gteMetadata) {
+    watch.start();
+    try {
+      boolean haveSubmittedLaggingEvent = false; //
+      while (!purgeTask.isDone()) {
+        long elapsedTimeMs = watch.elapsed(TimeUnit.MILLISECONDS);
+        log.info("Waiting for helix to purge offline instances. Cannot proceed 
with execution because purging is a "
+            + "non-thread safe call. To disable purging offline instances 
during startup, change the flag {} "
+            + "elapsedTimeMs={}, laggingThresholdMs={}",
+            
GobblinYarnConfigurationKeys.HELIX_PURGE_OFFLINE_INSTANCES_ENABLED, 
elapsedTimeMs, laggingThresholdMs);
+        if (!haveSubmittedLaggingEvent && elapsedTimeMs > laggingThresholdMs) {
+          submitLaggingEvent(elapsedTimeMs, laggingThresholdMs, gteMetadata);
+          haveSubmittedLaggingEvent = true;
+        }
+        Thread.sleep(this.pollingRateMs);
+      }
+
+      long timeToPurgeMs = watch.elapsed(TimeUnit.MILLISECONDS);
+      if (!haveSubmittedLaggingEvent && timeToPurgeMs > laggingThresholdMs) {
+        submitLaggingEvent(timeToPurgeMs, laggingThresholdMs, gteMetadata);
+      }
+
+      purgeTask.get(); // check for exceptions
+      submitCompletedEvent(timeToPurgeMs, gteMetadata);
+      return timeToPurgeMs;
+    } catch (ExecutionException | InterruptedException e) {
+      log.warn("The call to purge offline helix instances failed. This is not 
a fatal error because it is not mandatory to "
+          + "clean up old helix instances. But repeated failure to purge 
offline helix instances will cause an accumulation"
+          + "of offline helix instances which may cause large delays in future 
helix calls.", e);
+      long timeToPurgeMs = watch.elapsed(TimeUnit.MILLISECONDS);
+      submitFailureEvent(timeToPurgeMs, gteMetadata);
+      return timeToPurgeMs;
+    }
+  }
+
+  private void submitFailureEvent(long elapsedTimeMs, Map<String, String> 
additionalMetadata) {
+    if (eventSubmitter != null) {
+      GobblinEventBuilder eventBuilder = new 
GobblinEventBuilder(PURGE_FAILURE_EVENT);
+      eventBuilder.addAdditionalMetadata(additionalMetadata);
+      eventBuilder.addMetadata("elapsedTimeMs", String.valueOf(elapsedTimeMs));
+
+      log.warn("Submitting GTE because purging offline instances has failed to 
complete. event={}", eventBuilder);
+      eventSubmitter.submit(eventBuilder);
+    } else {
+      log.warn("Cannot submit {} GTE because eventSubmitter is null", 
PURGE_FAILURE_EVENT);
+    }
+  }
+
+  private void submitCompletedEvent(long timeToPurgeMs, Map<String, String> 
additionalMetadata) {
+    if (eventSubmitter != null) {
+      GobblinEventBuilder eventBuilder = new 
GobblinEventBuilder(PURGE_COMPLETED_EVENT);
+      eventBuilder.addAdditionalMetadata(additionalMetadata);
+      eventBuilder.addMetadata("timeToPurgeMs", String.valueOf(timeToPurgeMs));
+
+      log.info("Submitting GTE because purging offline instances has completed 
successfully. event={}", eventBuilder);
+      eventSubmitter.submit(eventBuilder);
+    } else {
+      log.warn("Cannot submit {} GTE because eventSubmitter is null", 
PURGE_COMPLETED_EVENT);
+    }
+  }
+
+  private void submitLaggingEvent(long elapsedTimeMs, long laggingThresholdMs,
+      Map<String, String> additionalMetadata) {
+    if (eventSubmitter != null) {
+      GobblinEventBuilder eventBuilder = new 
GobblinEventBuilder(PURGE_LAGGING_EVENT);
+      eventBuilder.addAdditionalMetadata(additionalMetadata);
+      eventBuilder.addMetadata("elapsedTimeMs", String.valueOf(elapsedTimeMs));
+      eventBuilder.addMetadata("laggingThresholdMs", 
String.valueOf(laggingThresholdMs));
+
+      log.info("Submitting GTE because purging offline instances is lagging 
and has exceeded lagging threshold. event={}",
+          eventBuilder);
+      eventSubmitter.submit(eventBuilder);
+    } else {
+      log.warn("Cannot submit {} GTE because eventSubmitter is null", 
PURGE_LAGGING_EVENT);
+    }
+  }
+}
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index 2f8a06c88..8a12bb0ce 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.util.Records;
+import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -157,6 +158,7 @@ public class YarnService extends AbstractIdleService {
   private final Optional<String> containerJvmArgs;
   private final String containerTimezone;
   private final HelixManager helixManager;
+  private final HelixAdmin helixAdmin;
 
   @Getter(AccessLevel.PROTECTED)
   private volatile Optional<Resource> maxResourceCapacity = Optional.absent();
@@ -194,6 +196,10 @@ public class YarnService extends AbstractIdleService {
   // The map from helix tag to allocated container count
   private final Map<String, Integer> allocatedContainerCountMap = 
Maps.newConcurrentMap();
 
+  private final boolean isPurgingOfflineHelixInstancesEnabled;
+  private final long helixPurgeLaggingThresholdMs;
+  private final long helixPurgeStatusPollingRateMs;
+
   private volatile YarnContainerRequestBundle yarnContainerRequest;
   private final AtomicInteger priorityNumGenerator = new AtomicInteger(0);
   private final Map<String, Integer> resourcePriorityMap = new HashMap<>();
@@ -201,7 +207,7 @@ public class YarnService extends AbstractIdleService {
   private volatile boolean shutdownInProgress = false;
 
   public YarnService(Config config, String applicationName, String 
applicationId, YarnConfiguration yarnConfiguration,
-      FileSystem fs, EventBus eventBus, HelixManager helixManager) throws 
Exception {
+      FileSystem fs, EventBus eventBus, HelixManager helixManager, HelixAdmin 
helixAdmin) throws Exception {
     this.applicationName = applicationName;
     this.applicationId = applicationId;
 
@@ -210,6 +216,7 @@ public class YarnService extends AbstractIdleService {
     this.eventBus = eventBus;
 
     this.helixManager = helixManager;
+    this.helixAdmin = helixAdmin;
 
     this.gobblinMetrics = 
config.getBoolean(ConfigurationKeys.METRICS_ENABLED_KEY) ?
         Optional.of(buildGobblinMetrics()) : Optional.<GobblinMetrics>absent();
@@ -237,6 +244,15 @@ public class YarnService extends AbstractIdleService {
     this.helixInstanceMaxRetries = 
config.getInt(GobblinYarnConfigurationKeys.HELIX_INSTANCE_MAX_RETRIES);
     this.helixInstanceTags = ConfigUtils.getString(config,
         GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY, 
GobblinClusterConfigurationKeys.HELIX_DEFAULT_TAG);
+    this.isPurgingOfflineHelixInstancesEnabled = ConfigUtils.getBoolean(config,
+        GobblinYarnConfigurationKeys.HELIX_PURGE_OFFLINE_INSTANCES_ENABLED,
+        
GobblinYarnConfigurationKeys.DEFAULT_HELIX_PURGE_OFFLINE_INSTANCES_ENABLED);
+    this.helixPurgeLaggingThresholdMs = ConfigUtils.getLong(config,
+        GobblinYarnConfigurationKeys.HELIX_PURGE_LAGGING_THRESHOLD_MILLIS,
+        
GobblinYarnConfigurationKeys.DEFAULT_HELIX_PURGE_LAGGING_THRESHOLD_MILLIS);
+    this.helixPurgeStatusPollingRateMs = ConfigUtils.getLong(config,
+        GobblinYarnConfigurationKeys.HELIX_PURGE_POLLING_RATE_MILLIS,
+        GobblinYarnConfigurationKeys.DEFAULT_HELIX_PURGE_POLLING_RATE_MILLIS);
 
     this.containerJvmArgs = 
config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY) ?
         
Optional.of(config.getString(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY))
 :
@@ -339,10 +355,26 @@ public class YarnService extends AbstractIdleService {
     LOGGER.info("ApplicationMaster registration response: " + response);
     this.maxResourceCapacity = 
Optional.of(response.getMaximumResourceCapability());
 
+    if (this.isPurgingOfflineHelixInstancesEnabled) {
+      purgeHelixOfflineInstances(this.helixPurgeLaggingThresholdMs);
+    }
+
     LOGGER.info("Requesting initial containers");
     requestInitialContainers(this.initialContainers);
   }
 
+  private void purgeHelixOfflineInstances(long laggingThresholdMs) {
+    LOGGER.info("Purging offline helix instances before allocating containers 
for helixClusterName={}, connectionString={}, helixPurgeStatusPollingRateMs={}",
+        helixManager.getClusterName(), 
helixManager.getMetadataStoreConnectionString(), 
this.helixPurgeStatusPollingRateMs);
+    HelixInstancePurgerWithMetrics purger = new 
HelixInstancePurgerWithMetrics(this.eventSubmitter.orNull(),
+        this.helixPurgeStatusPollingRateMs);
+    Map<String, String> gteMetadata = ImmutableMap.of(
+        "connectionString", 
this.helixManager.getMetadataStoreConnectionString(),
+        "clusterName", this.helixManager.getClusterName()
+    );
+    purger.purgeAllOfflineInstances(this.helixAdmin, 
this.helixManager.getClusterName(), laggingThresholdMs, gteMetadata);
+  }
+
   @Override
   protected void shutDown() throws IOException {
     LOGGER.info("Stopping the YarnService");
@@ -982,4 +1014,4 @@ public class YarnService extends AbstractIdleService {
     private final String helixParticipantId;
     private final String helixTag;
   }
-}
\ No newline at end of file
+}
diff --git 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
index 99ee33a33..95c109cfe 100644
--- 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
+++ 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
@@ -527,7 +527,7 @@ public class GobblinYarnAppLauncherTest implements 
HelixMessageTestBase {
   private static class TestYarnService extends YarnService {
     public TestYarnService(Config config, String applicationName, String 
applicationId, YarnConfiguration yarnConfiguration,
         FileSystem fs, EventBus eventBus) throws Exception {
-      super(config, applicationName, applicationId, yarnConfiguration, fs, 
eventBus, null);
+      super(config, applicationName, applicationId, yarnConfiguration, fs, 
eventBus, null, null);
     }
 
     @Override
diff --git 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/HelixInstancePurgerWithMetricsTest.java
 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/HelixInstancePurgerWithMetricsTest.java
new file mode 100644
index 000000000..f889a341f
--- /dev/null
+++ 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/HelixInstancePurgerWithMetricsTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import com.google.common.base.Stopwatch;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.testng.PowerMockTestCase;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.*;
+import static org.testng.Assert.*;
+
+
+/**
+ * Test class uses PowerMockito and Testng
+ * References:
+ * https://github.com/powermock/powermock/issues/434
+ * 
https://www.igorkromin.net/index.php/2018/10/04/how-to-fix-powermock-exception-linkageerror-loader-constraint-violation/
+ * https://github.com/powermock/powermock/wiki/MockFinal
+ */
+@PrepareForTest(Stopwatch.class)
+@PowerMockIgnore("javax.management.*")
+public class HelixInstancePurgerWithMetricsTest extends PowerMockTestCase {
+
+  @Mock EventSubmitter eventSubmitter;
+  @Mock Stopwatch stopwatch;
+  @Mock CompletableFuture<Void> mockTask;
+  @Captor ArgumentCaptor<GobblinEventBuilder> gteCaptor;
+  HelixInstancePurgerWithMetrics sut;
+
+  private static final long LAGGING_PURGE_THRESHOLD_MS = 100;
+  private static final long PURGE_STATUS_POLLING_RATE_MS = 10;
+
+
+  @BeforeMethod
+  private void init() {
+    MockitoAnnotations.initMocks(this);
+    sut = new HelixInstancePurgerWithMetrics(eventSubmitter, 
PURGE_STATUS_POLLING_RATE_MS);
+  }
+
+  @Test
+  public void testPurgeOfflineInstances() throws ExecutionException, 
InterruptedException {
+    Mockito.when(stopwatch.start()).thenReturn(stopwatch);
+    
Mockito.when(stopwatch.elapsed(TimeUnit.MILLISECONDS)).thenReturn(LAGGING_PURGE_THRESHOLD_MS);
+    Mockito.when(mockTask.isDone())
+        .thenReturn(false)
+        .thenReturn(true);
+    Mockito.when(mockTask.get()).thenReturn(null);
+    
Mockito.doNothing().when(eventSubmitter).submit(Mockito.any(GobblinEventBuilder.class));
+
+    long elapsedTime = sut.waitForPurgeCompletion(mockTask, 
LAGGING_PURGE_THRESHOLD_MS, stopwatch, Collections.emptyMap());
+
+    assertEquals(elapsedTime, LAGGING_PURGE_THRESHOLD_MS);
+    Mockito.verify(stopwatch, times(1)).start();
+    Mockito.verify(mockTask, times(1)).get();
+    Mockito.verify(eventSubmitter, times(1)).submit(gteCaptor.capture());
+    assertEquals(gteCaptor.getValue().getName(), 
HelixInstancePurgerWithMetrics.PURGE_COMPLETED_EVENT);
+  }
+
+  @Test
+  public void testPurgeOfflineInstancesSendsWarningEventWhenWaiting() throws 
ExecutionException, InterruptedException {
+    Mockito.when(mockTask.isDone()).thenReturn(false).thenReturn(true);
+    testPurgeOfflineInstancesSendsWarningEventHelper();
+  }
+
+  @Test
+  public void 
testPurgeOfflineInstancesSendsWarningEventIfTaskFinishedImmediately() throws 
ExecutionException, InterruptedException {
+    Mockito.when(mockTask.isDone()).thenReturn(true);
+    testPurgeOfflineInstancesSendsWarningEventHelper();
+  }
+
+  private void testPurgeOfflineInstancesSendsWarningEventHelper() throws 
ExecutionException, InterruptedException {
+    Mockito.when(stopwatch.start()).thenReturn(stopwatch);
+    
Mockito.when(stopwatch.elapsed(TimeUnit.MILLISECONDS)).thenReturn(LAGGING_PURGE_THRESHOLD_MS
 + 1);
+    Mockito.when(mockTask.isDone()).thenReturn(false).thenReturn(true);
+    Mockito.when(mockTask.get()).thenReturn(null);
+    
Mockito.doNothing().when(eventSubmitter).submit(Mockito.any(GobblinEventBuilder.class));
+
+    long elapsedTime = sut.waitForPurgeCompletion(mockTask, 
LAGGING_PURGE_THRESHOLD_MS, stopwatch, Collections.emptyMap());
+    assertEquals(elapsedTime, LAGGING_PURGE_THRESHOLD_MS + 1);
+
+    Mockito.verify(stopwatch, times(1)).start();
+    Mockito.verify(mockTask, times(1)).get();
+    Mockito.verify(eventSubmitter, times(2)).submit(gteCaptor.capture());
+    assertEquals(gteCaptor.getAllValues().get(0).getName(), 
HelixInstancePurgerWithMetrics.PURGE_LAGGING_EVENT);
+    assertEquals(gteCaptor.getAllValues().get(1).getName(), 
HelixInstancePurgerWithMetrics.PURGE_COMPLETED_EVENT);
+  }
+
+  @Test
+  public void testPurgeOfflineInstancesSendsFailureEvent() throws 
ExecutionException, InterruptedException {
+    Mockito.when(stopwatch.start()).thenReturn(stopwatch);
+    
Mockito.when(stopwatch.elapsed(TimeUnit.MILLISECONDS)).thenReturn(LAGGING_PURGE_THRESHOLD_MS
 + 1);
+    Mockito.when(mockTask.isDone()).thenReturn(true);
+    Mockito.when(mockTask.get()).thenThrow(new ExecutionException("Throwing 
exception to emulate helix failure", new RuntimeException()));
+    
Mockito.doNothing().when(eventSubmitter).submit(Mockito.any(GobblinEventBuilder.class));
+
+    long elapsedTime = sut.waitForPurgeCompletion(mockTask, 
LAGGING_PURGE_THRESHOLD_MS, stopwatch, Collections.emptyMap());
+    assertEquals(elapsedTime, LAGGING_PURGE_THRESHOLD_MS +1);
+
+    Mockito.verify(stopwatch, times(1)).start();
+    Mockito.verify(mockTask, times(1)).get();
+    Mockito.verify(eventSubmitter, times(2)).submit(gteCaptor.capture());
+    assertEquals(gteCaptor.getAllValues().get(0).getName(), 
HelixInstancePurgerWithMetrics.PURGE_LAGGING_EVENT);
+    assertEquals(gteCaptor.getAllValues().get(1).getName(), 
HelixInstancePurgerWithMetrics.PURGE_FAILURE_EVENT);
+  }
+}
diff --git 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
index 76c53330b..c01d91196 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
@@ -303,19 +303,18 @@ public class YarnServiceTest {
   static class TestYarnService extends YarnService {
     public TestYarnService(Config config, String applicationName, String 
applicationId, YarnConfiguration yarnConfiguration,
         FileSystem fs, EventBus eventBus) throws Exception {
-      super(config, applicationName, applicationId, yarnConfiguration, fs, 
eventBus, getMockHelixManager(config));
+      super(config, applicationName, applicationId, yarnConfiguration, fs, 
eventBus, getMockHelixManager(config), getMockHelixAdmin());
     }
 
     private static HelixManager getMockHelixManager(Config config) {
       HelixManager helixManager = Mockito.mock(HelixManager.class);
-      HelixAdmin helixAdmin = Mockito.mock(HelixAdmin.class);
       HelixDataAccessor helixDataAccessor = 
Mockito.mock(HelixDataAccessor.class);
       PropertyKey propertyKey = Mockito.mock(PropertyKey.class);
       PropertyKey.Builder propertyKeyBuilder = 
Mockito.mock(PropertyKey.Builder.class);
 
       
Mockito.when(helixManager.getInstanceName()).thenReturn("helixInstance1");
       
Mockito.when(helixManager.getClusterName()).thenReturn(config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY));
-      Mockito.doNothing().when(helixAdmin).enableInstance(Mockito.anyString(), 
Mockito.anyString(), Mockito.anyBoolean());
+
       
Mockito.when(helixManager.getHelixDataAccessor()).thenReturn(helixDataAccessor);
       
Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(propertyKeyBuilder);
       
Mockito.when(propertyKeyBuilder.liveInstance(Mockito.anyString())).thenReturn(propertyKey);
@@ -324,6 +323,13 @@ public class YarnServiceTest {
       return helixManager;
     }
 
+    private static HelixAdmin getMockHelixAdmin() {
+      HelixAdmin helixAdmin = Mockito.mock(HelixAdmin.class);
+      
Mockito.doNothing().when(helixAdmin).purgeOfflineInstances(Mockito.anyString(), 
Mockito.anyLong());
+      Mockito.doNothing().when(helixAdmin).enableInstance(Mockito.anyString(), 
Mockito.anyString(), Mockito.anyBoolean());
+      return helixAdmin;
+    }
+
     protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo 
containerInfo)
         throws IOException {
       return BuilderUtils.newContainerLaunchContext(Collections.emptyMap(), 
Collections.emptyMap(),
@@ -367,4 +373,4 @@ public class YarnServiceTest {
       return success;
     }
   }
-}
\ No newline at end of file
+}

Reply via email to