This is an automated email from the ASF dual-hosted git repository. hapylestat pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/branch-2.7 by this push: new 9938407 AMBARI-25635 Clear Cluster and METRIC_AGGREGATORS MBeans upon shutdown (#3304) (payert via dgrinenko) 9938407 is described below commit 99384078da451f65ed670fa8a378d91ce46f81bf Author: Tamas Payer <35402259+pay...@users.noreply.github.com> AuthorDate: Wed Apr 28 08:28:08 2021 +0200 AMBARI-25635 Clear Cluster and METRIC_AGGREGATORS MBeans upon shutdown (#3304) (payert via dgrinenko) Refactor HelicController inner class to LiveInstanceTracker. The HelicController was derived from GeneralHelixController class. This caused problem becasue practically two Controllers were in use due to thus bad pactice. The HelicController class is not intended to be a real Controller, only a tracker of the live nodes what is basically just a LiveInstanceChangeListener implementation. --- .../availability/MetricCollectorHAController.java | 175 ++++++++++----------- .../MetricCollectorHAControllerTest.java | 5 +- 2 files changed, 84 insertions(+), 96 deletions(-) diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAController.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAController.java index adbe8e7..b4ea111 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAController.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAController.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -31,7 +31,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.I0Itec.zkclient.exception.ZkNoNodeException; -import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -42,51 +41,55 @@ import org.apache.helix.HelixException; import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerFactory; import org.apache.helix.InstanceType; +import org.apache.helix.LiveInstanceChangeListener; import org.apache.helix.NotificationContext; -import org.apache.helix.controller.GenericHelixController; +import org.apache.helix.PropertyKey; import org.apache.helix.manager.zk.ZKHelixAdmin; import org.apache.helix.model.ExternalView; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; import org.apache.helix.model.OnlineOfflineSMD; -import org.apache.helix.model.StateModelDefinition; -import org.apache.helix.tools.StateModelConfigGenerator; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; -; public class MetricCollectorHAController { private static final Log LOG = LogFactory.getLog(MetricCollectorHAController.class); + @VisibleForTesting static final String CLUSTER_NAME = "ambari-metrics-cluster"; + @VisibleForTesting static final String METRIC_AGGREGATORS = "METRIC_AGGREGATORS"; + @VisibleForTesting static final String DEFAULT_STATE_MODEL = OnlineOfflineSMD.name; - static final String INSTANCE_NAME_DELIMITER = "_"; + private static final String INSTANCE_NAME_DELIMITER = "_"; + private static final int PARTITION_NUMBER = 2; + private static final int REPLICATION_FACTOR = 1; + @VisibleForTesting final String zkConnectUrl; - final String instanceHostname; - final InstanceConfig instanceConfig; - final AggregationTaskRunner aggregationTaskRunner; - final TimelineMetricConfiguration configuration; + private final String instanceHostname; + private final InstanceConfig instanceConfig; + private final AggregationTaskRunner aggregationTaskRunner; // Cache list of known live instances - final List<String> liveInstanceNames = new ArrayList<>(); + private final List<String> liveInstanceNames = new ArrayList<>(2); + private final LiveInstanceTracker liveInstanceTracker = new LiveInstanceTracker(); // Helix Admin + @VisibleForTesting HelixAdmin admin; // Helix Manager - HelixManager manager; + private HelixManager manager; private volatile boolean isInitialized = false; public MetricCollectorHAController(TimelineMetricConfiguration configuration) { - this.configuration = configuration; String instancePort; try { instanceHostname = configuration.getInstanceHostnameFromEnv(); instancePort = configuration.getInstancePort(); - } catch (Exception e) { LOG.error("Error reading configs from classpath, will resort to defaults.", e); throw new MetricsSystemInitializationException(e.getMessage()); @@ -97,42 +100,33 @@ public class MetricCollectorHAController { String zkQuorum = configuration.getClusterZKQuorum(); if (StringUtils.isEmpty(zkClientPort) || StringUtils.isEmpty(zkQuorum)) { - throw new Exception("Unable to parse zookeeper quorum. clientPort = " - + zkClientPort +", quorum = " + zkQuorum); + throw new Exception(String.format("Unable to parse zookeeper quorum. clientPort = %s, quorum = %s", zkClientPort, zkQuorum)); } zkConnectUrl = configuration.getZkConnectionUrl(zkClientPort, zkQuorum); - } catch (Exception e) { LOG.error("Unable to load hbase-site from classpath.", e); - throw new MetricsSystemInitializationException(e.getMessage()); + throw new MetricsSystemInitializationException(e.getMessage(), e); } instanceConfig = new InstanceConfig(instanceHostname + INSTANCE_NAME_DELIMITER + instancePort); instanceConfig.setHostName(instanceHostname); instanceConfig.setPort(instancePort); instanceConfig.setInstanceEnabled(true); - aggregationTaskRunner = new AggregationTaskRunner( - instanceConfig.getInstanceName(), zkConnectUrl, getClusterName()); - } - /** - * Name of Helix znode - */ - public String getClusterName() { - return CLUSTER_NAME; + aggregationTaskRunner = new AggregationTaskRunner(instanceConfig.getInstanceName(), zkConnectUrl, CLUSTER_NAME); } /** * Initialize the instance with zookeeper via Helix */ public void initializeHAController() throws Exception { - String clusterName = getClusterName(); + // Create setup tool instance admin = new ZKHelixAdmin(zkConnectUrl); - // create cluster - LOG.info("Creating zookeeper cluster node: " + clusterName); - boolean clusterAdded = admin.addCluster(clusterName, false); - LOG.info("Was cluster added successfully? " + clusterAdded); + // Create cluster namespace in zookeeper. Don't recreate if exists. + LOG.info(String.format("Creating zookeeper cluster node: %s", CLUSTER_NAME)); + boolean clusterAdded = admin.addCluster(CLUSTER_NAME, false); + LOG.info(String.format("Was cluster added successfully? %s", clusterAdded)); // Adding host to the cluster boolean success = false; @@ -141,16 +135,16 @@ public class MetricCollectorHAController { for (int i = 0; i < tries && !success; i++) { try { - List<String> nodes = admin.getInstancesInCluster(clusterName); - if (CollectionUtils.isEmpty(nodes) || !nodes.contains(instanceConfig.getInstanceName())) { - LOG.info("Adding participant instance " + instanceConfig); - admin.addInstance(clusterName, instanceConfig); + List<String> nodes = admin.getInstancesInCluster(CLUSTER_NAME); + if (!nodes.contains(instanceConfig.getInstanceName())) { + LOG.info(String.format("Adding participant instance %s", instanceConfig)); + admin.addInstance(CLUSTER_NAME, instanceConfig); } success = true; } catch (HelixException | ZkNoNodeException ex) { LOG.warn("Helix Cluster not yet setup fully."); if (i < tries - 1) { - LOG.info("Waiting for " + sleepTimeInSeconds + " seconds and retrying."); + LOG.info(String.format("Waiting for %d seconds and retrying.", sleepTimeInSeconds)); TimeUnit.SECONDS.sleep(sleepTimeInSeconds); } else { LOG.error(ex); @@ -159,34 +153,32 @@ public class MetricCollectorHAController { } if (!success) { - LOG.info("Trying to create " + clusterName + " again since waiting for the creation did not help."); - admin.addCluster(clusterName, true); - List<String> nodes = admin.getInstancesInCluster(clusterName); - if (CollectionUtils.isEmpty(nodes) || !nodes.contains(instanceConfig.getInstanceName())) { - LOG.info("Adding participant instance " + instanceConfig); - admin.addInstance(clusterName, instanceConfig); + LOG.info(String.format("Trying to create %s again since waiting for the creation did not help.", CLUSTER_NAME)); + admin.addCluster(CLUSTER_NAME, true); + List<String> nodes = admin.getInstancesInCluster(CLUSTER_NAME); + if (!nodes.contains(instanceConfig.getInstanceName())) { + LOG.info(String.format("Adding participant instance %s", instanceConfig)); + admin.addInstance(CLUSTER_NAME, instanceConfig); } } - // Add a state model - if (admin.getStateModelDef(clusterName, DEFAULT_STATE_MODEL) == null) { + // Add an ONLINE-OFFLINE state model + if (admin.getStateModelDef(CLUSTER_NAME, DEFAULT_STATE_MODEL) == null) { LOG.info("Adding ONLINE-OFFLINE state model to the cluster"); - admin.addStateModelDef(clusterName, DEFAULT_STATE_MODEL, new StateModelDefinition( - StateModelConfigGenerator.generateConfigForOnlineOffline())); + admin.addStateModelDef(CLUSTER_NAME, DEFAULT_STATE_MODEL, OnlineOfflineSMD.build()); } // Add resources with 1 cluster-wide replica // Since our aggregators are unbalanced in terms of work distribution we // only need to distribute writes to METRIC_AGGREGATE and - // METRIC_RECORD_MINUTE - List<String> resources = admin.getResourcesInCluster(clusterName); + // METRIC_RECORD_MINUTE, i.e. the Host level and Cluster level aggregations + List<String> resources = admin.getResourcesInCluster(CLUSTER_NAME); if (!resources.contains(METRIC_AGGREGATORS)) { - LOG.info("Adding resource " + METRIC_AGGREGATORS + " with 2 partitions and 1 replicas"); - admin.addResource(clusterName, METRIC_AGGREGATORS, 2, DEFAULT_STATE_MODEL, FULL_AUTO.toString()); + LOG.info(String.format("Adding resource %s with %d partitions and %d replicas", METRIC_AGGREGATORS, PARTITION_NUMBER, REPLICATION_FACTOR)); + admin.addResource(CLUSTER_NAME, METRIC_AGGREGATORS, PARTITION_NUMBER, DEFAULT_STATE_MODEL, FULL_AUTO.toString()); } - // this will set up the ideal state, it calculates the preference list for - // each partition similar to consistent hashing - admin.rebalance(clusterName, METRIC_AGGREGATORS, 1); + // This will set up the ideal state, it calculates the preference list for each partition similar to consistent hashing. + admin.rebalance(CLUSTER_NAME, METRIC_AGGREGATORS, REPLICATION_FACTOR); // Start participant startAggregators(); @@ -194,13 +186,9 @@ public class MetricCollectorHAController { // Start controller startController(); - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - aggregationTaskRunner.stop(); - manager.disconnect(); - } - }); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + shutdownHAController(); + })); isInitialized = true; } @@ -215,24 +203,33 @@ public class MetricCollectorHAController { private void startAggregators() { try { aggregationTaskRunner.initialize(); - } catch (Exception e) { LOG.error("Unable to start aggregators.", e); - throw new MetricsSystemInitializationException(e.getMessage()); + throw new MetricsSystemInitializationException(e.getMessage(), e); } } private void startController() throws Exception { - manager = HelixManagerFactory.getZKHelixManager( - getClusterName(), - instanceHostname, - InstanceType.CONTROLLER, - zkConnectUrl - ); + manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, instanceHostname, InstanceType.CONTROLLER, zkConnectUrl); manager.connect(); - HelixController controller = new HelixController(); - manager.addLiveInstanceChangeListener(controller); + manager.addLiveInstanceChangeListener(liveInstanceTracker); + } + + public void shutdownHAController() { + if (isInitialized) { + LOG.info("Shooting down Metrics Collector's HAController."); + + PropertyKey.Builder keyBuilder = new PropertyKey.Builder(CLUSTER_NAME); + manager.removeListener(keyBuilder.liveInstances(), liveInstanceTracker); + liveInstanceTracker.shutdown(); + aggregationTaskRunner.stop(); + manager.disconnect(); + admin.close(); + + isInitialized = false; + LOG.info("Shutdown of Metrics Collector's HAController finished."); + } } public AggregationTaskRunner getAggregationTaskRunner() { @@ -240,7 +237,7 @@ public class MetricCollectorHAController { } public List<String> getLiveInstanceHostNames() { - List<String> liveInstanceHostNames = new ArrayList<>(); + List<String> liveInstanceHostNames = new ArrayList<>(2); for (String instance : liveInstanceNames) { liveInstanceHostNames.add(instance.split(INSTANCE_NAME_DELIMITER)[0]); @@ -249,52 +246,44 @@ public class MetricCollectorHAController { return liveInstanceHostNames; } - public class HelixController extends GenericHelixController { - ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); - Joiner joiner = Joiner.on(", ").skipNulls(); + public final class LiveInstanceTracker implements LiveInstanceChangeListener { + private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + private final Joiner joiner = Joiner.on(", ").skipNulls(); @Override public void onLiveInstanceChange(List<LiveInstance> liveInstances, NotificationContext changeContext) { - super.onLiveInstanceChange(liveInstances, changeContext); - liveInstanceNames.clear(); for (LiveInstance instance : liveInstances) { liveInstanceNames.add(instance.getInstanceName()); } - LOG.info("Detected change in liveliness of Collector instances. " + - "LiveIsntances = " + joiner.join(liveInstanceNames)); + LOG.info(String.format("Detected change in liveliness of Collector instances. LiveInstances = %s", joiner.join(liveInstanceNames))); // Print HA state - after some delay - executorService.schedule(new Runnable() { - @Override - public void run() { - printClusterState(); - } - }, 30, TimeUnit.SECONDS); - + executorService.schedule(() -> printClusterState(), 30, TimeUnit.SECONDS); + } + public void shutdown() { + executorService.shutdown(); } } public void printClusterState() { StringBuilder sb = new StringBuilder("\n######################### Cluster HA state ########################"); - ExternalView resourceExternalView = admin.getResourceExternalView(getClusterName(), METRIC_AGGREGATORS); + ExternalView resourceExternalView = admin.getResourceExternalView(CLUSTER_NAME, METRIC_AGGREGATORS); if (resourceExternalView != null) { - getPrintableResourceState(resourceExternalView, METRIC_AGGREGATORS, sb); + getPrintableResourceState(resourceExternalView, sb); } sb.append("\n##################################################"); LOG.info(sb.toString()); } - private void getPrintableResourceState(ExternalView resourceExternalView, - String resourceName, - StringBuilder sb) { + private void getPrintableResourceState(ExternalView resourceExternalView, StringBuilder sb) { TreeSet<String> sortedSet = new TreeSet<>(resourceExternalView.getPartitionSet()); sb.append("\nCLUSTER: "); - sb.append(getClusterName()); + sb.append(CLUSTER_NAME); sb.append("\nRESOURCE: "); - sb.append(resourceName); + sb.append(MetricCollectorHAController.METRIC_AGGREGATORS); for (String partitionName : sortedSet) { sb.append("\nPARTITION: "); sb.append(partitionName).append("\t"); diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAControllerTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAControllerTest.java index 385a5a1..e495d33 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAControllerTest.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAControllerTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -100,7 +100,6 @@ public class MetricCollectorHAControllerTest extends AbstractMiniHBaseClusterTes // Re-assigned partitions Assert.assertEquals(2, partitionInstanceMap.size()); - haController.getAggregationTaskRunner().stop(); - haController.manager.disconnect(); + haController.shutdownHAController(); } }