This is an automated email from the ASF dual-hosted git repository.

hapylestat pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 9938407  AMBARI-25635 Clear Cluster and METRIC_AGGREGATORS MBeans upon 
shutdown (#3304) (payert via dgrinenko)
9938407 is described below

commit 99384078da451f65ed670fa8a378d91ce46f81bf
Author: Tamas Payer <35402259+pay...@users.noreply.github.com>
AuthorDate: Wed Apr 28 08:28:08 2021 +0200

    AMBARI-25635 Clear Cluster and METRIC_AGGREGATORS MBeans upon shutdown 
(#3304) (payert via dgrinenko)
    
    Refactor HelicController inner class to LiveInstanceTracker.
    
    The HelicController was derived from GeneralHelixController class. This 
caused problem becasue
    practically two Controllers were in use due to thus bad pactice.
    The HelicController class is not intended to be a real Controller, only a 
tracker of the live nodes
    what is basically just a LiveInstanceChangeListener implementation.
---
 .../availability/MetricCollectorHAController.java  | 175 ++++++++++-----------
 .../MetricCollectorHAControllerTest.java           |   5 +-
 2 files changed, 84 insertions(+), 96 deletions(-)

diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAController.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAController.java
index adbe8e7..b4ea111 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAController.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAController.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -31,7 +31,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -42,51 +41,55 @@ import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
+import org.apache.helix.LiveInstanceChangeListener;
 import org.apache.helix.NotificationContext;
-import org.apache.helix.controller.GenericHelixController;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.OnlineOfflineSMD;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.tools.StateModelConfigGenerator;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 
-;
 
 public class MetricCollectorHAController {
   private static final Log LOG = 
LogFactory.getLog(MetricCollectorHAController.class);
 
+  @VisibleForTesting
   static final String CLUSTER_NAME = "ambari-metrics-cluster";
+  @VisibleForTesting
   static final String METRIC_AGGREGATORS = "METRIC_AGGREGATORS";
+  @VisibleForTesting
   static final String DEFAULT_STATE_MODEL = OnlineOfflineSMD.name;
-  static final String INSTANCE_NAME_DELIMITER = "_";
+  private static final String INSTANCE_NAME_DELIMITER = "_";
+  private static final int PARTITION_NUMBER = 2;
+  private static final int REPLICATION_FACTOR = 1;
 
+  @VisibleForTesting
   final String zkConnectUrl;
-  final String instanceHostname;
-  final InstanceConfig instanceConfig;
-  final AggregationTaskRunner aggregationTaskRunner;
-  final TimelineMetricConfiguration configuration;
+  private final String instanceHostname;
+  private final InstanceConfig instanceConfig;
+  private final AggregationTaskRunner aggregationTaskRunner;
 
   // Cache list of known live instances
-  final List<String> liveInstanceNames = new ArrayList<>();
+  private final List<String> liveInstanceNames = new ArrayList<>(2);
+  private final LiveInstanceTracker liveInstanceTracker = new 
LiveInstanceTracker();
 
   // Helix Admin
+  @VisibleForTesting
   HelixAdmin admin;
   // Helix Manager
-  HelixManager manager;
+  private HelixManager manager;
 
   private volatile boolean isInitialized = false;
 
   public MetricCollectorHAController(TimelineMetricConfiguration 
configuration) {
-    this.configuration = configuration;
     String instancePort;
     try {
       instanceHostname = configuration.getInstanceHostnameFromEnv();
       instancePort = configuration.getInstancePort();
-
     } catch (Exception e) {
       LOG.error("Error reading configs from classpath, will resort to 
defaults.", e);
       throw new MetricsSystemInitializationException(e.getMessage());
@@ -97,42 +100,33 @@ public class MetricCollectorHAController {
       String zkQuorum = configuration.getClusterZKQuorum();
 
       if (StringUtils.isEmpty(zkClientPort) || StringUtils.isEmpty(zkQuorum)) {
-        throw new Exception("Unable to parse zookeeper quorum. clientPort = "
-          + zkClientPort +", quorum = " + zkQuorum);
+        throw new Exception(String.format("Unable to parse zookeeper quorum. 
clientPort = %s, quorum = %s", zkClientPort, zkQuorum));
       }
 
       zkConnectUrl = configuration.getZkConnectionUrl(zkClientPort, zkQuorum);
-
     } catch (Exception e) {
       LOG.error("Unable to load hbase-site from classpath.", e);
-      throw new MetricsSystemInitializationException(e.getMessage());
+      throw new MetricsSystemInitializationException(e.getMessage(), e);
     }
 
     instanceConfig = new InstanceConfig(instanceHostname + 
INSTANCE_NAME_DELIMITER + instancePort);
     instanceConfig.setHostName(instanceHostname);
     instanceConfig.setPort(instancePort);
     instanceConfig.setInstanceEnabled(true);
-    aggregationTaskRunner = new AggregationTaskRunner(
-      instanceConfig.getInstanceName(), zkConnectUrl, getClusterName());
-  }
 
-  /**
-   * Name of Helix znode
-   */
-  public String getClusterName() {
-    return CLUSTER_NAME;
+    aggregationTaskRunner = new 
AggregationTaskRunner(instanceConfig.getInstanceName(), zkConnectUrl, 
CLUSTER_NAME);
   }
 
   /**
    * Initialize the instance with zookeeper via Helix
    */
   public void initializeHAController() throws Exception {
-    String clusterName = getClusterName();
+    // Create setup tool instance
     admin = new ZKHelixAdmin(zkConnectUrl);
-    // create cluster
-    LOG.info("Creating zookeeper cluster node: " + clusterName);
-    boolean clusterAdded = admin.addCluster(clusterName, false);
-    LOG.info("Was cluster added successfully? " + clusterAdded);
+    // Create cluster namespace in zookeeper. Don't recreate if exists.
+    LOG.info(String.format("Creating zookeeper cluster node: %s", 
CLUSTER_NAME));
+    boolean clusterAdded = admin.addCluster(CLUSTER_NAME, false);
+    LOG.info(String.format("Was cluster added successfully? %s", 
clusterAdded));
 
     // Adding host to the cluster
     boolean success = false;
@@ -141,16 +135,16 @@ public class MetricCollectorHAController {
 
     for (int i = 0; i < tries && !success; i++) {
       try {
-        List<String> nodes = admin.getInstancesInCluster(clusterName);
-        if (CollectionUtils.isEmpty(nodes) || 
!nodes.contains(instanceConfig.getInstanceName())) {
-          LOG.info("Adding participant instance " + instanceConfig);
-          admin.addInstance(clusterName, instanceConfig);
+        List<String> nodes = admin.getInstancesInCluster(CLUSTER_NAME);
+        if (!nodes.contains(instanceConfig.getInstanceName())) {
+          LOG.info(String.format("Adding participant instance %s", 
instanceConfig));
+          admin.addInstance(CLUSTER_NAME, instanceConfig);
         }
         success = true;
       } catch (HelixException | ZkNoNodeException ex) {
         LOG.warn("Helix Cluster not yet setup fully.");
         if (i < tries - 1) {
-          LOG.info("Waiting for " + sleepTimeInSeconds + " seconds and 
retrying.");
+          LOG.info(String.format("Waiting for %d seconds and retrying.", 
sleepTimeInSeconds));
           TimeUnit.SECONDS.sleep(sleepTimeInSeconds);
         } else {
           LOG.error(ex);
@@ -159,34 +153,32 @@ public class MetricCollectorHAController {
     }
 
     if (!success) {
-      LOG.info("Trying to create " + clusterName + " again since waiting for 
the creation did not help.");
-      admin.addCluster(clusterName, true);
-      List<String> nodes = admin.getInstancesInCluster(clusterName);
-      if (CollectionUtils.isEmpty(nodes) || 
!nodes.contains(instanceConfig.getInstanceName())) {
-        LOG.info("Adding participant instance " + instanceConfig);
-        admin.addInstance(clusterName, instanceConfig);
+      LOG.info(String.format("Trying to create %s again since waiting for the 
creation did not help.", CLUSTER_NAME));
+      admin.addCluster(CLUSTER_NAME, true);
+      List<String> nodes = admin.getInstancesInCluster(CLUSTER_NAME);
+      if (!nodes.contains(instanceConfig.getInstanceName())) {
+        LOG.info(String.format("Adding participant instance %s", 
instanceConfig));
+        admin.addInstance(CLUSTER_NAME, instanceConfig);
       }
     }
 
-    // Add a state model
-    if (admin.getStateModelDef(clusterName, DEFAULT_STATE_MODEL) == null) {
+    // Add an ONLINE-OFFLINE state model
+    if (admin.getStateModelDef(CLUSTER_NAME, DEFAULT_STATE_MODEL) == null) {
       LOG.info("Adding ONLINE-OFFLINE state model to the cluster");
-      admin.addStateModelDef(clusterName, DEFAULT_STATE_MODEL, new 
StateModelDefinition(
-        StateModelConfigGenerator.generateConfigForOnlineOffline()));
+      admin.addStateModelDef(CLUSTER_NAME, DEFAULT_STATE_MODEL, 
OnlineOfflineSMD.build());
     }
 
     // Add resources with 1 cluster-wide replica
     // Since our aggregators are unbalanced in terms of work distribution we
     // only need to distribute writes to METRIC_AGGREGATE and
-    // METRIC_RECORD_MINUTE
-    List<String> resources = admin.getResourcesInCluster(clusterName);
+    // METRIC_RECORD_MINUTE, i.e. the Host level and Cluster level aggregations
+    List<String> resources = admin.getResourcesInCluster(CLUSTER_NAME);
     if (!resources.contains(METRIC_AGGREGATORS)) {
-      LOG.info("Adding resource " + METRIC_AGGREGATORS + " with 2 partitions 
and 1 replicas");
-      admin.addResource(clusterName, METRIC_AGGREGATORS, 2, 
DEFAULT_STATE_MODEL, FULL_AUTO.toString());
+      LOG.info(String.format("Adding resource %s with %d partitions and %d 
replicas", METRIC_AGGREGATORS, PARTITION_NUMBER, REPLICATION_FACTOR));
+      admin.addResource(CLUSTER_NAME, METRIC_AGGREGATORS, PARTITION_NUMBER, 
DEFAULT_STATE_MODEL, FULL_AUTO.toString());
     }
-    // this will set up the ideal state, it calculates the preference list for
-    // each partition similar to consistent hashing
-    admin.rebalance(clusterName, METRIC_AGGREGATORS, 1);
+    // This will set up the ideal state, it calculates the preference list for 
each partition similar to consistent hashing.
+    admin.rebalance(CLUSTER_NAME, METRIC_AGGREGATORS, REPLICATION_FACTOR);
 
     // Start participant
     startAggregators();
@@ -194,13 +186,9 @@ public class MetricCollectorHAController {
     // Start controller
     startController();
 
-    Runtime.getRuntime().addShutdownHook(new Thread() {
-      @Override
-      public void run() {
-        aggregationTaskRunner.stop();
-        manager.disconnect();
-      }
-    });
+    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+      shutdownHAController();
+    }));
 
     isInitialized = true;
   }
@@ -215,24 +203,33 @@ public class MetricCollectorHAController {
   private void startAggregators() {
     try {
       aggregationTaskRunner.initialize();
-
     } catch (Exception e) {
       LOG.error("Unable to start aggregators.", e);
-      throw new MetricsSystemInitializationException(e.getMessage());
+      throw new MetricsSystemInitializationException(e.getMessage(), e);
     }
   }
 
   private void startController() throws Exception {
-    manager = HelixManagerFactory.getZKHelixManager(
-      getClusterName(),
-      instanceHostname,
-      InstanceType.CONTROLLER,
-      zkConnectUrl
-    );
+    manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, 
instanceHostname, InstanceType.CONTROLLER, zkConnectUrl);
 
     manager.connect();
-    HelixController controller = new HelixController();
-    manager.addLiveInstanceChangeListener(controller);
+    manager.addLiveInstanceChangeListener(liveInstanceTracker);
+  }
+
+  public void shutdownHAController() {
+    if (isInitialized) {
+      LOG.info("Shooting down Metrics Collector's HAController.");
+
+      PropertyKey.Builder keyBuilder = new PropertyKey.Builder(CLUSTER_NAME);
+      manager.removeListener(keyBuilder.liveInstances(), liveInstanceTracker);
+      liveInstanceTracker.shutdown();
+      aggregationTaskRunner.stop();
+      manager.disconnect();
+      admin.close();
+
+      isInitialized = false;
+      LOG.info("Shutdown of Metrics Collector's HAController finished.");
+    }
   }
 
   public AggregationTaskRunner getAggregationTaskRunner() {
@@ -240,7 +237,7 @@ public class MetricCollectorHAController {
   }
 
   public List<String> getLiveInstanceHostNames() {
-    List<String> liveInstanceHostNames = new ArrayList<>();
+    List<String> liveInstanceHostNames = new ArrayList<>(2);
 
     for (String instance : liveInstanceNames) {
       liveInstanceHostNames.add(instance.split(INSTANCE_NAME_DELIMITER)[0]);
@@ -249,52 +246,44 @@ public class MetricCollectorHAController {
     return liveInstanceHostNames;
   }
 
-  public class HelixController extends GenericHelixController {
-    ScheduledExecutorService executorService = 
Executors.newSingleThreadScheduledExecutor();
-    Joiner joiner = Joiner.on(", ").skipNulls();
+  public final class LiveInstanceTracker implements LiveInstanceChangeListener 
{
+    private final ScheduledExecutorService executorService = 
Executors.newSingleThreadScheduledExecutor();
+    private final Joiner joiner = Joiner.on(", ").skipNulls();
 
     @Override
     public void onLiveInstanceChange(List<LiveInstance> liveInstances, 
NotificationContext changeContext) {
-      super.onLiveInstanceChange(liveInstances, changeContext);
-
       liveInstanceNames.clear();
       for (LiveInstance instance : liveInstances) {
         liveInstanceNames.add(instance.getInstanceName());
       }
 
-      LOG.info("Detected change in liveliness of Collector instances. " +
-        "LiveIsntances = " + joiner.join(liveInstanceNames));
+      LOG.info(String.format("Detected change in liveliness of Collector 
instances. LiveInstances = %s", joiner.join(liveInstanceNames)));
       // Print HA state - after some delay
-      executorService.schedule(new Runnable() {
-        @Override
-        public void run() {
-          printClusterState();
-        }
-      }, 30, TimeUnit.SECONDS);
-
+      executorService.schedule(() -> printClusterState(), 30, 
TimeUnit.SECONDS);
+    }
 
+    public void shutdown() {
+      executorService.shutdown();
     }
   }
 
   public void printClusterState() {
     StringBuilder sb = new StringBuilder("\n######################### Cluster 
HA state ########################");
 
-    ExternalView resourceExternalView = 
admin.getResourceExternalView(getClusterName(), METRIC_AGGREGATORS);
+    ExternalView resourceExternalView = 
admin.getResourceExternalView(CLUSTER_NAME, METRIC_AGGREGATORS);
     if (resourceExternalView != null) {
-      getPrintableResourceState(resourceExternalView, METRIC_AGGREGATORS, sb);
+      getPrintableResourceState(resourceExternalView, sb);
     }
     sb.append("\n##################################################");
     LOG.info(sb.toString());
   }
 
-  private void getPrintableResourceState(ExternalView resourceExternalView,
-                                         String resourceName,
-                                         StringBuilder sb) {
+  private void getPrintableResourceState(ExternalView resourceExternalView, 
StringBuilder sb) {
     TreeSet<String> sortedSet = new 
TreeSet<>(resourceExternalView.getPartitionSet());
     sb.append("\nCLUSTER: ");
-    sb.append(getClusterName());
+    sb.append(CLUSTER_NAME);
     sb.append("\nRESOURCE: ");
-    sb.append(resourceName);
+    sb.append(MetricCollectorHAController.METRIC_AGGREGATORS);
     for (String partitionName : sortedSet) {
       sb.append("\nPARTITION: ");
       sb.append(partitionName).append("\t");
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAControllerTest.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAControllerTest.java
index 385a5a1..e495d33 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAControllerTest.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAControllerTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -100,7 +100,6 @@ public class MetricCollectorHAControllerTest extends 
AbstractMiniHBaseClusterTes
     // Re-assigned partitions
     Assert.assertEquals(2, partitionInstanceMap.size());
 
-    haController.getAggregationTaskRunner().stop();
-    haController.manager.disconnect();
+    haController.shutdownHAController();
   }
 }

Reply via email to