http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAController.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAController.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAController.java deleted file mode 100644 index d74f253..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAController.java +++ /dev/null @@ -1,307 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability; - -import static org.apache.helix.model.IdealState.RebalanceMode.FULL_AUTO; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.TreeSet; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import org.I0Itec.zkclient.exception.ZkNoNodeException; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricsSystemInitializationException; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration; -import org.apache.helix.HelixAdmin; -import org.apache.helix.HelixException; -import org.apache.helix.HelixManager; -import org.apache.helix.HelixManagerFactory; -import org.apache.helix.InstanceType; -import org.apache.helix.NotificationContext; -import org.apache.helix.controller.GenericHelixController; -import org.apache.helix.manager.zk.ZKHelixAdmin; -import org.apache.helix.model.ExternalView; -import org.apache.helix.model.InstanceConfig; -import org.apache.helix.model.LiveInstance; -import org.apache.helix.model.OnlineOfflineSMD; -import org.apache.helix.model.StateModelDefinition; -import org.apache.helix.tools.StateModelConfigGenerator; - -import com.google.common.base.Joiner; - -; - -public class MetricCollectorHAController { - private static final Log LOG = LogFactory.getLog(MetricCollectorHAController.class); - - static final String CLUSTER_NAME = "ambari-metrics-cluster"; - static final String METRIC_AGGREGATORS = "METRIC_AGGREGATORS"; - static final String DEFAULT_STATE_MODEL = OnlineOfflineSMD.name; - static final String INSTANCE_NAME_DELIMITER = "_"; - - final String zkConnectUrl; - final String instanceHostname; - final InstanceConfig instanceConfig; - final AggregationTaskRunner aggregationTaskRunner; - final TimelineMetricConfiguration configuration; - - // Cache list of known live instances - final List<String> liveInstanceNames = new ArrayList<>(); - - // Helix Admin - HelixAdmin admin; - // Helix Manager - HelixManager manager; - - private volatile boolean isInitialized = false; - - public MetricCollectorHAController(TimelineMetricConfiguration configuration) { - this.configuration = configuration; - String instancePort; - try { - instanceHostname = configuration.getInstanceHostnameFromEnv(); - instancePort = configuration.getInstancePort(); - - } catch (Exception e) { - LOG.error("Error reading configs from classpath, will resort to defaults.", e); - throw new MetricsSystemInitializationException(e.getMessage()); - } - - try { - String zkClientPort = configuration.getClusterZKClientPort(); - String zkQuorum = configuration.getClusterZKQuorum(); - - if (StringUtils.isEmpty(zkClientPort) || StringUtils.isEmpty(zkQuorum)) { - throw new Exception("Unable to parse zookeeper quorum. clientPort = " - + zkClientPort +", quorum = " + zkQuorum); - } - - zkConnectUrl = configuration.getZkConnectionUrl(zkClientPort, zkQuorum); - - } catch (Exception e) { - LOG.error("Unable to load hbase-site from classpath.", e); - throw new MetricsSystemInitializationException(e.getMessage()); - } - - instanceConfig = new InstanceConfig(instanceHostname + INSTANCE_NAME_DELIMITER + instancePort); - instanceConfig.setHostName(instanceHostname); - instanceConfig.setPort(instancePort); - instanceConfig.setInstanceEnabled(true); - aggregationTaskRunner = new AggregationTaskRunner( - instanceConfig.getInstanceName(), zkConnectUrl, getClusterName()); - } - - /** - * Name of Helix znode - */ - public String getClusterName() { - return CLUSTER_NAME; - } - - /** - * Initialize the instance with zookeeper via Helix - */ - public void initializeHAController() throws Exception { - String clusterName = getClusterName(); - admin = new ZKHelixAdmin(zkConnectUrl); - // create cluster - LOG.info("Creating zookeeper cluster node: " + clusterName); - boolean clusterAdded = admin.addCluster(clusterName, false); - LOG.info("Was cluster added successfully? " + clusterAdded); - - // Adding host to the cluster - boolean success = false; - int tries = 5; - int sleepTimeInSeconds = 5; - - for (int i = 0; i < tries && !success; i++) { - try { - List<String> nodes = admin.getInstancesInCluster(clusterName); - if (CollectionUtils.isEmpty(nodes) || !nodes.contains(instanceConfig.getInstanceName())) { - LOG.info("Adding participant instance " + instanceConfig); - admin.addInstance(clusterName, instanceConfig); - } - success = true; - } catch (HelixException | ZkNoNodeException ex) { - LOG.warn("Helix Cluster not yet setup fully."); - if (i < tries - 1) { - LOG.info("Waiting for " + sleepTimeInSeconds + " seconds and retrying."); - TimeUnit.SECONDS.sleep(sleepTimeInSeconds); - } else { - LOG.error(ex); - } - } - } - - if (!success) { - LOG.info("Trying to create " + clusterName + " again since waiting for the creation did not help."); - admin.addCluster(clusterName, true); - List<String> nodes = admin.getInstancesInCluster(clusterName); - if (CollectionUtils.isEmpty(nodes) || !nodes.contains(instanceConfig.getInstanceName())) { - LOG.info("Adding participant instance " + instanceConfig); - admin.addInstance(clusterName, instanceConfig); - } - } - - // Add a state model - if (admin.getStateModelDef(clusterName, DEFAULT_STATE_MODEL) == null) { - LOG.info("Adding ONLINE-OFFLINE state model to the cluster"); - admin.addStateModelDef(clusterName, DEFAULT_STATE_MODEL, new StateModelDefinition( - StateModelConfigGenerator.generateConfigForOnlineOffline())); - } - - // Add resources with 1 cluster-wide replica - // Since our aggregators are unbalanced in terms of work distribution we - // only need to distribute writes to METRIC_AGGREGATE and - // METRIC_RECORD_MINUTE - List<String> resources = admin.getResourcesInCluster(clusterName); - if (!resources.contains(METRIC_AGGREGATORS)) { - LOG.info("Adding resource " + METRIC_AGGREGATORS + " with 2 partitions and 1 replicas"); - admin.addResource(clusterName, METRIC_AGGREGATORS, 2, DEFAULT_STATE_MODEL, FULL_AUTO.toString()); - } - // this will set up the ideal state, it calculates the preference list for - // each partition similar to consistent hashing - admin.rebalance(clusterName, METRIC_AGGREGATORS, 1); - - // Start participant - startAggregators(); - - // Start controller - startController(); - - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - aggregationTaskRunner.stop(); - manager.disconnect(); - } - }); - - isInitialized = true; - } - - /** - * Return true if HA controller is enabled. - */ - public boolean isInitialized() { - return isInitialized; - } - - private void startAggregators() { - try { - aggregationTaskRunner.initialize(); - - } catch (Exception e) { - LOG.error("Unable to start aggregators.", e); - throw new MetricsSystemInitializationException(e.getMessage()); - } - } - - private void startController() throws Exception { - manager = HelixManagerFactory.getZKHelixManager( - getClusterName(), - instanceHostname, - InstanceType.CONTROLLER, - zkConnectUrl - ); - - manager.connect(); - HelixController controller = new HelixController(); - manager.addLiveInstanceChangeListener(controller); - } - - public AggregationTaskRunner getAggregationTaskRunner() { - return aggregationTaskRunner; - } - - public List<String> getLiveInstanceHostNames() { - List<String> liveInstanceHostNames = new ArrayList<>(); - - for (String instance : liveInstanceNames) { - liveInstanceHostNames.add(instance.split(INSTANCE_NAME_DELIMITER)[0]); - } - - return liveInstanceHostNames; - } - - public class HelixController extends GenericHelixController { - ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); - Joiner joiner = Joiner.on(", ").skipNulls(); - - @Override - public void onLiveInstanceChange(List<LiveInstance> liveInstances, NotificationContext changeContext) { - super.onLiveInstanceChange(liveInstances, changeContext); - - liveInstanceNames.clear(); - for (LiveInstance instance : liveInstances) { - liveInstanceNames.add(instance.getInstanceName()); - } - - LOG.info("Detected change in liveliness of Collector instances. " + - "LiveIsntances = " + joiner.join(liveInstanceNames)); - // Print HA state - after some delay - executorService.schedule(new Runnable() { - @Override - public void run() { - printClusterState(); - } - }, 30, TimeUnit.SECONDS); - - - } - } - - public void printClusterState() { - StringBuilder sb = new StringBuilder("\n######################### Cluster HA state ########################"); - - ExternalView resourceExternalView = admin.getResourceExternalView(getClusterName(), METRIC_AGGREGATORS); - if (resourceExternalView != null) { - getPrintableResourceState(resourceExternalView, METRIC_AGGREGATORS, sb); - } - sb.append("\n##################################################"); - LOG.info(sb.toString()); - } - - private void getPrintableResourceState(ExternalView resourceExternalView, - String resourceName, - StringBuilder sb) { - TreeSet<String> sortedSet = new TreeSet<>(resourceExternalView.getPartitionSet()); - sb.append("\nCLUSTER: "); - sb.append(getClusterName()); - sb.append("\nRESOURCE: "); - sb.append(resourceName); - for (String partitionName : sortedSet) { - sb.append("\nPARTITION: "); - sb.append(partitionName).append("\t"); - Map<String, String> states = resourceExternalView.getStateMap(partitionName); - for (Map.Entry<String, String> stateEntry : states.entrySet()) { - sb.append("\t"); - sb.append(stateEntry.getKey()); - sb.append("\t"); - sb.append(stateEntry.getValue()); - } - } - } -}
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/OnlineOfflineStateModelFactory.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/OnlineOfflineStateModelFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/OnlineOfflineStateModelFactory.java deleted file mode 100644 index a53dc3b..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/OnlineOfflineStateModelFactory.java +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability; - -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.PARTITION_AGGREGATION_TYPES; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator.AGGREGATOR_TYPE; -import org.apache.helix.NotificationContext; -import org.apache.helix.model.Message; -import org.apache.helix.participant.statemachine.StateModel; -import org.apache.helix.participant.statemachine.StateModelFactory; - -public class OnlineOfflineStateModelFactory extends StateModelFactory<StateModel> { - private static final Log LOG = LogFactory.getLog(OnlineOfflineStateModelFactory.class); - private final String instanceName; - private final AggregationTaskRunner taskRunner; - - public OnlineOfflineStateModelFactory(String instanceName, AggregationTaskRunner taskRunner) { - this.instanceName = instanceName; - this.taskRunner = taskRunner; - } - - @Override - public StateModel createNewStateModel(String resourceName, String partition) { - LOG.info("Received request to process partition = " + partition + ", for " + - "resource = " + resourceName + ", at " + instanceName); - return new OnlineOfflineStateModel(); - } - - public class OnlineOfflineStateModel extends StateModel { - public void onBecomeOnlineFromOffline(Message message, NotificationContext context) { - String partitionName = message.getPartitionName(); - LOG.info("Received transition to Online from Offline for partition: " + partitionName); - AGGREGATOR_TYPE type = PARTITION_AGGREGATION_TYPES.get(partitionName); - taskRunner.setPartitionAggregationFunction(type); - } - - public void onBecomeOfflineFromOnline(Message message, NotificationContext context) { - String partitionName = message.getPartitionName(); - LOG.info("Received transition to Offline from Online for partition: " + partitionName); - AGGREGATOR_TYPE type = PARTITION_AGGREGATION_TYPES.get(partitionName); - taskRunner.unsetPartitionAggregationFunction(type); - } - - public void onBecomeDroppedFromOffline(Message message, NotificationContext context) { - String partitionName = message.getPartitionName(); - LOG.info("Received transition to Dropped from Offline for partition: " + partitionName); - AGGREGATOR_TYPE type = PARTITION_AGGREGATION_TYPES.get(partitionName); - taskRunner.unsetPartitionAggregationFunction(type); - } - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricHostMetadata.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricHostMetadata.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricHostMetadata.java deleted file mode 100644 index 37c6394..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricHostMetadata.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery; - -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -public class TimelineMetricHostMetadata { - //need concurrent data structure, only keys are used. - private ConcurrentHashMap<String, String> hostedApps = new ConcurrentHashMap<>(); - private byte[] uuid; - - // Default constructor - public TimelineMetricHostMetadata() { - } - - public TimelineMetricHostMetadata(ConcurrentHashMap<String, String> hostedApps) { - this.hostedApps = hostedApps; - } - - public TimelineMetricHostMetadata(Set<String> hostedApps) { - ConcurrentHashMap<String, String> appIdsMap = new ConcurrentHashMap<>(); - for (String appId : hostedApps) { - appIdsMap.put(appId, appId); - } - this.hostedApps = appIdsMap; - } - - public ConcurrentHashMap<String, String> getHostedApps() { - return hostedApps; - } - - public void setHostedApps(ConcurrentHashMap<String, String> hostedApps) { - this.hostedApps = hostedApps; - } - - public byte[] getUuid() { - return uuid; - } - - public void setUuid(byte[] uuid) { - this.uuid = uuid; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataKey.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataKey.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataKey.java deleted file mode 100644 index 0c0ee5b..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataKey.java +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery; - -import javax.xml.bind.annotation.XmlRootElement; - -import org.apache.commons.lang3.StringUtils; - -@XmlRootElement -public class TimelineMetricMetadataKey { - String metricName; - String appId; - String instanceId; - - public TimelineMetricMetadataKey(String metricName, String appId, String instanceId) { - this.metricName = metricName; - this.appId = appId; - this.instanceId = instanceId; - } - - public String getMetricName() { - return metricName; - } - - public String getAppId() { - return appId; - } - - public String getInstanceId() { - return instanceId; - } - - public void setAppId(String appId) { - this.appId = appId; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - TimelineMetricMetadataKey that = (TimelineMetricMetadataKey) o; - - if (!metricName.equals(that.metricName)) return false; - if (!appId.equals(that.appId)) return false; - return (StringUtils.isNotEmpty(instanceId) ? instanceId.equals(that.instanceId) : StringUtils.isEmpty(that.instanceId)); - } - - @Override - public int hashCode() { - int result = metricName.hashCode(); - result = 31 * result + (appId != null ? appId.hashCode() : 0); - result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0); - return result; - } - - @Override - public String toString() { - return "TimelineMetricMetadataKey{" + - "metricName='" + metricName + '\'' + - ", appId='" + appId + '\'' + - ", instanceId='" + instanceId + '\'' + - '}'; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java deleted file mode 100644 index beac866..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java +++ /dev/null @@ -1,612 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery; - -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DISABLE_METRIC_METADATA_MGMT; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.METRICS_METADATA_SYNC_INIT_DELAY; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.METRICS_METADATA_SYNC_SCHEDULE_DELAY; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_UUID_GEN_STRATEGY; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_METADATA_FILTERS; - -import java.net.MalformedURLException; -import java.net.URISyntaxException; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.collections.MapUtils; -import org.apache.commons.lang.ArrayUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.metrics2.sink.timeline.MetadataException; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.uuid.HashBasedUuidGenStrategy; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.uuid.MetricUuidGenStrategy; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.uuid.RandomUuidGenStrategy; - -public class TimelineMetricMetadataManager { - private static final Log LOG = LogFactory.getLog(TimelineMetricMetadataManager.class); - private boolean isDisabled = false; - // Cache all metadata on retrieval - private final Map<TimelineMetricMetadataKey, TimelineMetricMetadata> METADATA_CACHE = new ConcurrentHashMap<>(); - private final Map<String, TimelineMetricMetadataKey> uuidKeyMap = new ConcurrentHashMap<>(); - // Map to lookup apps on a host - private final Map<String, TimelineMetricHostMetadata> HOSTED_APPS_MAP = new ConcurrentHashMap<>(); - private final Map<String, String> uuidHostMap = new ConcurrentHashMap<>(); - private final Map<String, Set<String>> INSTANCE_HOST_MAP = new ConcurrentHashMap<>(); - // Sync only when needed - AtomicBoolean SYNC_HOSTED_APPS_METADATA = new AtomicBoolean(false); - AtomicBoolean SYNC_HOSTED_INSTANCES_METADATA = new AtomicBoolean(false); - private MetricUuidGenStrategy uuidGenStrategy = new HashBasedUuidGenStrategy(); - public static final int TIMELINE_METRIC_UUID_LENGTH = 16; - public static final int HOSTNAME_UUID_LENGTH = 4; - - // Single thread to sync back new writes to the store - private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); - - private PhoenixHBaseAccessor hBaseAccessor; - private Configuration metricsConf; - - TimelineMetricMetadataSync metricMetadataSync; - // Filter metrics names matching given patterns, from metadata - final List<String> metricNameFilters = new ArrayList<>(); - - // Test friendly construction since mock instrumentation is difficult to get - // working with hadoop mini cluster - public TimelineMetricMetadataManager(Configuration metricsConf, PhoenixHBaseAccessor hBaseAccessor) { - this.metricsConf = metricsConf; - this.hBaseAccessor = hBaseAccessor; - String patternStrings = metricsConf.get(TIMELINE_METRIC_METADATA_FILTERS); - if (!StringUtils.isEmpty(patternStrings)) { - metricNameFilters.addAll(Arrays.asList(patternStrings.split(","))); - } - - uuidGenStrategy = getUuidStrategy(metricsConf); - } - - public TimelineMetricMetadataManager(PhoenixHBaseAccessor hBaseAccessor) throws MalformedURLException, URISyntaxException { - this(TimelineMetricConfiguration.getInstance().getMetricsConf(), hBaseAccessor); - } - - /** - * Initialize Metadata from the store - */ - public void initializeMetadata() { - if (metricsConf.getBoolean(DISABLE_METRIC_METADATA_MGMT, false)) { - isDisabled = true; - } else { - metricMetadataSync = new TimelineMetricMetadataSync(this); - // Schedule the executor to sync to store - executorService.scheduleWithFixedDelay(metricMetadataSync, - metricsConf.getInt(METRICS_METADATA_SYNC_INIT_DELAY, 120), // 2 minutes - metricsConf.getInt(METRICS_METADATA_SYNC_SCHEDULE_DELAY, 300), // 5 minutes - TimeUnit.SECONDS); - // Read from store and initialize map - try { - Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadata = getMetadataFromStore(); - - LOG.info("Retrieved " + metadata.size() + ", metadata objects from store."); - // Store in the cache - METADATA_CACHE.putAll(metadata); - - Map<String, TimelineMetricHostMetadata> hostedAppData = getHostedAppsFromStore(); - - LOG.info("Retrieved " + hostedAppData.size() + " host objects from store."); - HOSTED_APPS_MAP.putAll(hostedAppData); - - loadUuidMapsOnInit(); - - hBaseAccessor.setMetadataInstance(this); - } catch (SQLException e) { - LOG.warn("Exception loading metric metadata", e); - } - } - } - - public Map<TimelineMetricMetadataKey, TimelineMetricMetadata> getMetadataCache() { - return METADATA_CACHE; - } - - public TimelineMetricMetadata getMetadataCacheValue(TimelineMetricMetadataKey key) { - return METADATA_CACHE.get(key); - } - - public Map<String, TimelineMetricHostMetadata> getHostedAppsCache() { - return HOSTED_APPS_MAP; - } - - public Map<String, Set<String>> getHostedInstanceCache() { - return INSTANCE_HOST_MAP; - } - - public boolean syncHostedAppsMetadata() { - return SYNC_HOSTED_APPS_METADATA.get(); - } - - public boolean syncHostedInstanceMetadata() { - return SYNC_HOSTED_INSTANCES_METADATA.get(); - } - - public void markSuccessOnSyncHostedAppsMetadata() { - SYNC_HOSTED_APPS_METADATA.set(false); - } - - public void markSuccessOnSyncHostedInstanceMetadata() { - SYNC_HOSTED_INSTANCES_METADATA.set(false); - } - /** - * Test metric name for valid patterns and return true/false - */ - boolean skipMetadataCache(String metricName) { - for (String pattern : metricNameFilters) { - if (metricName.contains(pattern)) { - return true; - } - } - return false; - } - - /** - * Update value in metadata cache - * @param metadata @TimelineMetricMetadata - */ - public void putIfModifiedTimelineMetricMetadata(TimelineMetricMetadata metadata) { - if (skipMetadataCache(metadata.getMetricName())) { - return; - } - - TimelineMetricMetadataKey key = new TimelineMetricMetadataKey( - metadata.getMetricName(), metadata.getAppId(), metadata.getInstanceId()); - - TimelineMetricMetadata metadataFromCache = METADATA_CACHE.get(key); - - if (metadataFromCache != null) { - try { - if (metadataFromCache.needsToBeSynced(metadata)) { - metadata.setIsPersisted(false); // Set the flag to ensure sync to store on next run - METADATA_CACHE.put(key, metadata); - } - } catch (MetadataException e) { - LOG.warn("Error inserting Metadata in cache.", e); - } - - } else { - METADATA_CACHE.put(key, metadata); - } - } - - /** - * Update value in hosted apps cache - * @param hostname Host name - * @param appId Application Id - */ - public void putIfModifiedHostedAppsMetadata(String hostname, String appId) { - TimelineMetricHostMetadata timelineMetricHostMetadata = HOSTED_APPS_MAP.get(hostname); - ConcurrentHashMap<String, String> apps = (timelineMetricHostMetadata != null) ? timelineMetricHostMetadata.getHostedApps() : null; - if (apps == null) { - apps = new ConcurrentHashMap<>(); - if (timelineMetricHostMetadata == null) { - HOSTED_APPS_MAP.put(hostname, new TimelineMetricHostMetadata(apps)); - } else { - HOSTED_APPS_MAP.get(hostname).setHostedApps(apps); - } - } - - if (!apps.containsKey(appId)) { - apps.put(appId, appId); - SYNC_HOSTED_APPS_METADATA.set(true); - } - } - - public void putIfModifiedHostedInstanceMetadata(String instanceId, String hostname) { - if (StringUtils.isEmpty(instanceId)) { - return; - } - - Set<String> hosts = INSTANCE_HOST_MAP.get(instanceId); - if (hosts == null) { - hosts = new HashSet<>(); - INSTANCE_HOST_MAP.put(instanceId, hosts); - } - - if (!hosts.contains(hostname)) { - hosts.add(hostname); - SYNC_HOSTED_INSTANCES_METADATA.set(true); - } - } - - public void persistMetadata(Collection<TimelineMetricMetadata> metadata) throws SQLException { - hBaseAccessor.saveMetricMetadata(metadata); - } - - public void persistHostedAppsMetadata(Map<String, TimelineMetricHostMetadata> hostedApps) throws SQLException { - hBaseAccessor.saveHostAppsMetadata(hostedApps); - } - - public void persistHostedInstanceMetadata(Map<String, Set<String>> hostedInstancesMetadata) throws SQLException { - hBaseAccessor.saveInstanceHostsMetadata(hostedInstancesMetadata); - } - - public TimelineMetricMetadata getTimelineMetricMetadata(TimelineMetric timelineMetric, boolean isWhitelisted) { - return new TimelineMetricMetadata( - timelineMetric.getMetricName(), - timelineMetric.getAppId(), - timelineMetric.getInstanceId(), - timelineMetric.getUnits(), - timelineMetric.getType(), - timelineMetric.getStartTime(), - supportAggregates(timelineMetric), - isWhitelisted - ); - } - - public boolean isDisabled() { - return isDisabled; - } - - boolean isDistributedModeEnabled() { - return metricsConf.get("timeline.metrics.service.operation.mode").equals("distributed"); - } - - /** - * Fetch metrics metadata from store - * @throws SQLException - */ - Map<TimelineMetricMetadataKey, TimelineMetricMetadata> getMetadataFromStore() throws SQLException { - return hBaseAccessor.getTimelineMetricMetadata(); - } - - /** - * Fetch hosted apps from store - * @throws SQLException - */ - Map<String, TimelineMetricHostMetadata> getHostedAppsFromStore() throws SQLException { - return hBaseAccessor.getHostedAppsMetadata(); - } - - Map<String, Set<String>> getHostedInstancesFromStore() throws SQLException { - return hBaseAccessor.getInstanceHostsMetdata(); - } - - private boolean supportAggregates(TimelineMetric metric) { - return MapUtils.isEmpty(metric.getMetadata()) || - !(String.valueOf(true).equals(metric.getMetadata().get("skipAggregation"))); - } - - ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // UUID Management - ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - - - /** - * Load the UUID mappings from the UUID table on startup. - */ - private void loadUuidMapsOnInit() { - - for (TimelineMetricMetadataKey key : METADATA_CACHE.keySet()) { - TimelineMetricMetadata timelineMetricMetadata = METADATA_CACHE.get(key); - if (timelineMetricMetadata != null && timelineMetricMetadata.getUuid() != null) { - uuidKeyMap.put(new String(timelineMetricMetadata.getUuid()), key); - } - } - - for (String host : HOSTED_APPS_MAP.keySet()) { - TimelineMetricHostMetadata timelineMetricHostMetadata = HOSTED_APPS_MAP.get(host); - if (timelineMetricHostMetadata != null && timelineMetricHostMetadata.getUuid() != null) { - uuidHostMap.put(new String(timelineMetricHostMetadata.getUuid()), host); - } - } - } - - /** - * Returns the UUID gen strategy. - * @param configuration - * @return - */ - private MetricUuidGenStrategy getUuidStrategy(Configuration configuration) { - String strategy = configuration.get(TIMELINE_METRICS_UUID_GEN_STRATEGY, ""); - if ("random".equalsIgnoreCase(strategy)) { - return new RandomUuidGenStrategy(); - } else { - return new HashBasedUuidGenStrategy(); - } - } - - /** - * Given the hostname, generates a byte array of length 'HOSTNAME_UUID_LENGTH' - * @param hostname - * @return uuid byte array of length 'HOSTNAME_UUID_LENGTH' - */ - private byte[] getUuidForHostname(String hostname) { - - TimelineMetricHostMetadata timelineMetricHostMetadata = HOSTED_APPS_MAP.get(hostname); - if (timelineMetricHostMetadata != null) { - byte[] uuid = timelineMetricHostMetadata.getUuid(); - if (uuid != null) { - return uuid; - } - } - - byte[] uuid = uuidGenStrategy.computeUuid(hostname, HOSTNAME_UUID_LENGTH); - - String uuidStr = new String(uuid); - if (uuidHostMap.containsKey(uuidStr)) { - //TODO fix the collisions - LOG.error("Duplicate key computed for " + hostname +", Collides with " + uuidHostMap.get(uuidStr)); - return uuid; - } - - if (timelineMetricHostMetadata == null) { - timelineMetricHostMetadata = new TimelineMetricHostMetadata(); - HOSTED_APPS_MAP.put(hostname, timelineMetricHostMetadata); - } - timelineMetricHostMetadata.setUuid(uuid); - uuidHostMap.put(uuidStr, hostname); - - return uuid; - } - - /** - * Given a timelineClusterMetric instance, generates a UUID for Metric-App-Instance combination. - * @param timelineClusterMetric - * @return uuid byte array of length 'TIMELINE_METRIC_UUID_LENGTH' - */ - public byte[] getUuid(TimelineClusterMetric timelineClusterMetric) { - TimelineMetricMetadataKey key = new TimelineMetricMetadataKey(timelineClusterMetric.getMetricName(), - timelineClusterMetric.getAppId(), timelineClusterMetric.getInstanceId()); - - TimelineMetricMetadata timelineMetricMetadata = METADATA_CACHE.get(key); - if (timelineMetricMetadata != null) { - byte[] uuid = timelineMetricMetadata.getUuid(); - if (uuid != null) { - return uuid; - } - } - - byte[] uuid = uuidGenStrategy.computeUuid(timelineClusterMetric, TIMELINE_METRIC_UUID_LENGTH); - - String uuidStr = new String(uuid); - if (uuidKeyMap.containsKey(uuidStr) && !uuidKeyMap.get(uuidStr).equals(key)) { - TimelineMetricMetadataKey collidingKey = (TimelineMetricMetadataKey)uuidKeyMap.get(uuidStr); - //TODO fix the collisions - /** - * 2017-08-23 14:12:35,922 ERROR org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager: - * Duplicate key [52, 50, 51, 53, 50, 53, 53, 53, 49, 54, 57, 50, 50, 54, 0, 0]([B@278a93f9) computed for - * TimelineClusterMetric{metricName='sdisk_dm-11_write_count', appId='hbase', instanceId='', timestamp=1503497400000}, Collides with - * TimelineMetricMetadataKey{metricName='sdisk_dm-20_write_count', appId='hbase', instanceId=''} - */ - LOG.error("Duplicate key " + Arrays.toString(uuid) + "(" + uuid + ") computed for " + timelineClusterMetric.toString() + ", Collides with " + collidingKey.toString()); - return uuid; - } - - if (timelineMetricMetadata == null) { - timelineMetricMetadata = new TimelineMetricMetadata(); - timelineMetricMetadata.setMetricName(timelineClusterMetric.getMetricName()); - timelineMetricMetadata.setAppId(timelineClusterMetric.getAppId()); - timelineMetricMetadata.setInstanceId(timelineClusterMetric.getInstanceId()); - METADATA_CACHE.put(key, timelineMetricMetadata); - } - - timelineMetricMetadata.setUuid(uuid); - timelineMetricMetadata.setIsPersisted(false); - uuidKeyMap.put(uuidStr, key); - return uuid; - } - - /** - * Given a timelineMetric instance, generates a UUID for Metric-App-Instance combination. - * @param timelineMetric - * @return uuid byte array of length 'TIMELINE_METRIC_UUID_LENGTH' + 'HOSTNAME_UUID_LENGTH' - */ - public byte[] getUuid(TimelineMetric timelineMetric) { - - byte[] metricUuid = getUuid(new TimelineClusterMetric(timelineMetric.getMetricName(), timelineMetric.getAppId(), - timelineMetric.getInstanceId(), -1l)); - byte[] hostUuid = getUuidForHostname(timelineMetric.getHostName()); - - return ArrayUtils.addAll(metricUuid, hostUuid); - } - - public byte[] getUuid(String metricName, String appId, String instanceId, String hostname) { - - byte[] metricUuid = getUuid(new TimelineClusterMetric(metricName, appId, instanceId, -1l)); - if (StringUtils.isNotEmpty(hostname)) { - byte[] hostUuid = getUuidForHostname(hostname); - return ArrayUtils.addAll(metricUuid, hostUuid); - } - return metricUuid; - } - - public String getMetricNameFromUuid(byte[] uuid) { - - byte[] metricUuid = uuid; - if (uuid.length == TIMELINE_METRIC_UUID_LENGTH + HOSTNAME_UUID_LENGTH) { - metricUuid = ArrayUtils.subarray(uuid, 0, TIMELINE_METRIC_UUID_LENGTH); - } - - TimelineMetricMetadataKey key = uuidKeyMap.get(new String(metricUuid)); - return key != null ? key.getMetricName() : null; - } - - public TimelineMetric getMetricFromUuid(byte[] uuid) { - if (uuid == null) { - return null; - } - - if (uuid.length == TIMELINE_METRIC_UUID_LENGTH) { - TimelineMetricMetadataKey key = uuidKeyMap.get(new String(uuid)); - return key != null ? new TimelineMetric(key.metricName, null, key.appId, key.instanceId) : null; - } else { - byte[] metricUuid = ArrayUtils.subarray(uuid, 0, TIMELINE_METRIC_UUID_LENGTH); - TimelineMetricMetadataKey key = uuidKeyMap.get(new String(metricUuid)); - if (key == null) { - LOG.error("TimelineMetricMetadataKey is null for : " + Arrays.toString(uuid)); - return null; - } - TimelineMetric timelineMetric = new TimelineMetric(); - timelineMetric.setMetricName(key.metricName); - timelineMetric.setAppId(key.appId); - timelineMetric.setInstanceId(key.instanceId); - - byte[] hostUuid = ArrayUtils.subarray(uuid, TIMELINE_METRIC_UUID_LENGTH, HOSTNAME_UUID_LENGTH + TIMELINE_METRIC_UUID_LENGTH); - timelineMetric.setHostName(uuidHostMap.get(new String(hostUuid))); - return timelineMetric; - } - } - - /** - * Returns the set of UUIDs for a given GET request. If there are wildcards (%), resolves them based on UUID map. - * @param metricNames - * @param hostnames - * @param appId - * @param instanceId - * @return Set of UUIds - */ - public List<byte[]> getUuids(Collection<String> metricNames, List<String> hostnames, String appId, String instanceId) { - - Collection<String> sanitizedMetricNames = new HashSet<>(); - - for (String metricName : metricNames) { - if (metricName.contains("%")) { - String metricRegEx; - //Special case handling for metric name with * and __%. - //For example, dfs.NNTopUserOpCounts.windowMs=300000.op=*.user=%.count - // or dfs.NNTopUserOpCounts.windowMs=300000.op=__%.user=%.count - if (metricName.contains("*") || metricName.contains("__%")) { - String metricNameWithEscSeq = metricName.replace("*", "\\*").replace("__%", "..%"); - metricRegEx = metricNameWithEscSeq.replace("%", ".*"); - } else { - metricRegEx = metricName.replace("%", ".*"); - } - for (TimelineMetricMetadataKey key : METADATA_CACHE.keySet()) { - String metricNameFromMetadata = key.getMetricName(); - if (metricNameFromMetadata.matches(metricRegEx)) { - sanitizedMetricNames.add(metricNameFromMetadata); - } - } - } else { - sanitizedMetricNames.add(metricName); - } - } - - Set<String> sanitizedHostNames = new HashSet<>(); - if (CollectionUtils.isNotEmpty(hostnames)) { - for (String hostname : hostnames) { - if (hostname.contains("%")) { - String hostRegEx; - hostRegEx = hostname.replace("%", ".*"); - for (String host : HOSTED_APPS_MAP.keySet()) { - if (host.matches(hostRegEx)) { - sanitizedHostNames.add(host); - } - } - } else { - sanitizedHostNames.add(hostname); - } - } - } - - List<byte[]> uuids = new ArrayList<>(); - - if (!(appId.equals("HOST") || appId.equals("FLUME_HANDLER"))) { //HACK.. Why?? - appId = appId.toLowerCase(); - } - if (CollectionUtils.isNotEmpty(sanitizedHostNames)) { - if (CollectionUtils.isNotEmpty(sanitizedMetricNames)) { - for (String metricName : sanitizedMetricNames) { - TimelineMetric metric = new TimelineMetric(); - metric.setMetricName(metricName); - metric.setAppId(appId); - metric.setInstanceId(instanceId); - for (String hostname : sanitizedHostNames) { - metric.setHostName(hostname); - byte[] uuid = getUuid(metric); - if (uuid != null) { - uuids.add(uuid); - } - } - } - } else { - for (String hostname : sanitizedHostNames) { - byte[] uuid = getUuidForHostname(hostname); - if (uuid != null) { - uuids.add(uuid); - } - } - } - } else { - for (String metricName : sanitizedMetricNames) { - TimelineClusterMetric metric = new TimelineClusterMetric(metricName, appId, instanceId, -1l); - byte[] uuid = getUuid(metric); - if (uuid != null) { - uuids.add(uuid); - } - } - } - - return uuids; - } - - public Map<String, TimelineMetricMetadataKey> getUuidKeyMap() { - return uuidKeyMap; - } - - public List<String> getNotLikeHostnames(List<String> hostnames) { - List<String> result = new ArrayList<>(); - Set<String> sanitizedHostNames = new HashSet<>(); - if (CollectionUtils.isNotEmpty(hostnames)) { - for (String hostname : hostnames) { - if (hostname.contains("%")) { - String hostRegEx; - hostRegEx = hostname.replace("%", ".*"); - for (String host : HOSTED_APPS_MAP.keySet()) { - if (host.matches(hostRegEx)) { - sanitizedHostNames.add(host); - } - } - } else { - sanitizedHostNames.add(hostname); - } - } - } - - for (String hostname: HOSTED_APPS_MAP.keySet()) { - if (!sanitizedHostNames.contains(hostname)) { - result.add(hostname); - } - } - return result; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataSync.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataSync.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataSync.java deleted file mode 100644 index fa5f55a..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataSync.java +++ /dev/null @@ -1,227 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery; - -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; - -/** - * Sync metadata info with the store - */ -public class TimelineMetricMetadataSync implements Runnable { - private static final Log LOG = LogFactory.getLog(TimelineMetricMetadataSync.class); - - private final TimelineMetricMetadataManager cacheManager; - - public TimelineMetricMetadataSync(TimelineMetricMetadataManager cacheManager) { - this.cacheManager = cacheManager; - } - - @Override - public void run() { - LOG.debug("Persisting metric metadata..."); - persistMetricMetadata(); - LOG.debug("Persisting hosted apps metadata..."); - persistHostAppsMetadata(); - LOG.debug("Persisting hosted instance metadata..."); - persistHostInstancesMetadata(); - if (cacheManager.isDistributedModeEnabled()) { - LOG.debug("Refreshing metric metadata..."); - refreshMetricMetadata(); - LOG.debug("Refreshing hosted apps metadata..."); - refreshHostAppsMetadata(); - LOG.debug("Refreshing hosted instances metadata..."); - refreshHostedInstancesMetadata(); - } - } - - /** - * Find metrics not persisted to store and persist them - */ - private void persistMetricMetadata() { - List<TimelineMetricMetadata> metadataToPersist = new ArrayList<>(); - // Find all entries to persist - for (TimelineMetricMetadata metadata : cacheManager.getMetadataCache().values()) { - if (!metadata.isPersisted()) { - metadataToPersist.add(metadata); - } - } - boolean markSuccess = false; - if (!metadataToPersist.isEmpty()) { - try { - cacheManager.persistMetadata(metadataToPersist); - markSuccess = true; - } catch (SQLException e) { - LOG.warn("Error persisting metadata.", e); - } - } - // Mark corresponding entries as persisted to skip on next run - if (markSuccess) { - for (TimelineMetricMetadata metadata : metadataToPersist) { - TimelineMetricMetadataKey key = new TimelineMetricMetadataKey( - metadata.getMetricName(), metadata.getAppId(), metadata.getInstanceId() - ); - - // Mark entry as being persisted - metadata.setIsPersisted(true); - // Update cache - cacheManager.getMetadataCache().put(key, metadata); - } - } - } - - /** - * Read all metric metadata and update cached values - HA mode - */ - private void refreshMetricMetadata() { - Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadataFromStore = null; - try { - metadataFromStore = cacheManager.getMetadataFromStore(); - } catch (SQLException e) { - LOG.warn("Error refreshing metadata from store.", e); - } - if (metadataFromStore != null) { - Map<TimelineMetricMetadataKey, TimelineMetricMetadata> cachedMetadata = - cacheManager.getMetadataCache(); - - for (Map.Entry<TimelineMetricMetadataKey, TimelineMetricMetadata> metadataEntry : metadataFromStore.entrySet()) { - if (!cachedMetadata.containsKey(metadataEntry.getKey())) { - cachedMetadata.put(metadataEntry.getKey(), metadataEntry.getValue()); - } - } - } - } - - /** - * Sync hosted apps data if needed - */ - private void persistHostAppsMetadata() { - if (cacheManager.syncHostedAppsMetadata()) { - Map<String, TimelineMetricHostMetadata> persistedData = null; - try { - persistedData = cacheManager.getHostedAppsFromStore(); - } catch (SQLException e) { - LOG.warn("Failed on fetching hosted apps data from store.", e); - return; // Something wrong with store - } - - Map<String, TimelineMetricHostMetadata> cachedData = cacheManager.getHostedAppsCache(); - Map<String, TimelineMetricHostMetadata> dataToSync = new HashMap<>(); - if (cachedData != null && !cachedData.isEmpty()) { - for (Map.Entry<String, TimelineMetricHostMetadata> cacheEntry : cachedData.entrySet()) { - // No persistence / stale data in store - if (persistedData == null || persistedData.isEmpty() || - !persistedData.containsKey(cacheEntry.getKey()) || - !persistedData.get(cacheEntry.getKey()).getHostedApps().keySet().containsAll(cacheEntry.getValue().getHostedApps().keySet())) { - dataToSync.put(cacheEntry.getKey(), cacheEntry.getValue()); - } - } - try { - cacheManager.persistHostedAppsMetadata(dataToSync); - cacheManager.markSuccessOnSyncHostedAppsMetadata(); - - } catch (SQLException e) { - LOG.warn("Error persisting hosted apps metadata.", e); - } - } - - } - } - - /** - * Sync apps instances data if needed - */ - private void persistHostInstancesMetadata() { - if (cacheManager.syncHostedInstanceMetadata()) { - Map<String, Set<String>> persistedData = null; - try { - persistedData = cacheManager.getHostedInstancesFromStore(); - } catch (SQLException e) { - LOG.warn("Failed on fetching hosted instances data from store.", e); - return; // Something wrong with store - } - - Map<String, Set<String>> cachedData = cacheManager.getHostedInstanceCache(); - Map<String, Set<String>> dataToSync = new HashMap<>(); - if (cachedData != null && !cachedData.isEmpty()) { - for (Map.Entry<String, Set<String>> cacheEntry : cachedData.entrySet()) { - // No persistence / stale data in store - if (persistedData == null || persistedData.isEmpty() || - !persistedData.containsKey(cacheEntry.getKey()) || - !persistedData.get(cacheEntry.getKey()).containsAll(cacheEntry.getValue())) { - dataToSync.put(cacheEntry.getKey(), cacheEntry.getValue()); - } - } - try { - cacheManager.persistHostedInstanceMetadata(dataToSync); - cacheManager.markSuccessOnSyncHostedInstanceMetadata(); - - } catch (SQLException e) { - LOG.warn("Error persisting hosted apps metadata.", e); - } - } - - } - } - /** - * Read all hosted apps metadata and update cached values - HA - */ - private void refreshHostAppsMetadata() { - Map<String, TimelineMetricHostMetadata> hostedAppsDataFromStore = null; - try { - hostedAppsDataFromStore = cacheManager.getHostedAppsFromStore(); - } catch (SQLException e) { - LOG.warn("Error refreshing metadata from store.", e); - } - if (hostedAppsDataFromStore != null) { - Map<String, TimelineMetricHostMetadata> cachedData = cacheManager.getHostedAppsCache(); - - for (Map.Entry<String, TimelineMetricHostMetadata> storeEntry : hostedAppsDataFromStore.entrySet()) { - if (!cachedData.containsKey(storeEntry.getKey())) { - cachedData.put(storeEntry.getKey(), storeEntry.getValue()); - } - } - } - } - - private void refreshHostedInstancesMetadata() { - Map<String, Set<String>> hostedInstancesFromStore = null; - try { - hostedInstancesFromStore = cacheManager.getHostedInstancesFromStore(); - } catch (SQLException e) { - LOG.warn("Error refreshing metadata from store.", e); - } - if (hostedInstancesFromStore != null) { - Map<String, Set<String>> cachedData = cacheManager.getHostedInstanceCache(); - - for (Map.Entry<String, Set<String>> storeEntry : hostedInstancesFromStore.entrySet()) { - if (!cachedData.containsKey(storeEntry.getKey())) { - cachedData.put(storeEntry.getKey(), storeEntry.getValue()); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/function/AbstractTimelineMetricsSeriesAggregateFunction.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/function/AbstractTimelineMetricsSeriesAggregateFunction.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/function/AbstractTimelineMetricsSeriesAggregateFunction.java deleted file mode 100644 index 5a5dde4..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/function/AbstractTimelineMetricsSeriesAggregateFunction.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function; - -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; - -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; - -import com.google.common.base.Joiner; - -public abstract class AbstractTimelineMetricsSeriesAggregateFunction - implements TimelineMetricsSeriesAggregateFunction { - - @Override - public TimelineMetric apply(TimelineMetrics timelineMetrics) { - Set<String> metricNameSet = new TreeSet<>(); - Set<String> hostNameSet = new TreeSet<>(); - Set<String> appIdSet = new TreeSet<>(); - Set<String> instanceIdSet = new TreeSet<>(); - TreeMap<Long, List<Double>> metricValues = new TreeMap<>(); - - for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) { - metricNameSet.add(timelineMetric.getMetricName()); - addToSetOnlyNotNull(hostNameSet, timelineMetric.getHostName()); - addToSetOnlyNotNull(appIdSet, timelineMetric.getAppId()); - addToSetOnlyNotNull(instanceIdSet, timelineMetric.getInstanceId()); - - for (Map.Entry<Long, Double> metricValue : timelineMetric.getMetricValues().entrySet()) { - Long timestamp = metricValue.getKey(); - Double value = metricValue.getValue(); - if (!metricValues.containsKey(timestamp)) { - metricValues.put(timestamp, new LinkedList<Double>()); - } - metricValues.get(timestamp).add(value); - } - } - - TreeMap<Long, Double> aggregatedMetricValues = new TreeMap<>(); - for (Map.Entry<Long, List<Double>> metricValue : metricValues.entrySet()) { - List<Double> values = metricValue.getValue(); - if (values.size() == 0) { - throw new IllegalArgumentException("count of values should be more than 0"); - } - aggregatedMetricValues.put(metricValue.getKey(), applyFunction(values)); - } - - TimelineMetric timelineMetric = new TimelineMetric(); - timelineMetric.setMetricName(getMetricName(metricNameSet.iterator())); - timelineMetric.setHostName(joinStringsWithComma(hostNameSet.iterator())); - timelineMetric.setAppId(joinStringsWithComma(appIdSet.iterator())); - timelineMetric.setInstanceId(joinStringsWithComma(instanceIdSet.iterator())); - if (aggregatedMetricValues.size() > 0) { - timelineMetric.setStartTime(aggregatedMetricValues.firstKey()); - } - timelineMetric.setMetricValues(aggregatedMetricValues); - return timelineMetric; - } - - protected String getMetricName(Iterator<String> metricNames) { - return getFunctionName() + "(" + Joiner.on(",").join(metricNames) + ")"; - } - - protected String joinStringsWithComma(Iterator<String> hostNames) { - return Joiner.on(",").join(hostNames); - } - - protected abstract Double applyFunction(List<Double> values); - protected abstract String getFunctionName(); - - private void addToSetOnlyNotNull(Set<String> set, String value) { - if (value != null) { - set.add(value); - } - } - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/function/SeriesAggregateFunction.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/function/SeriesAggregateFunction.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/function/SeriesAggregateFunction.java deleted file mode 100644 index ef5e441..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/function/SeriesAggregateFunction.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function; - -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function; - -public enum SeriesAggregateFunction { - AVG, MIN, MAX, SUM; - - public static boolean isPresent(String functionName) { - try { - SeriesAggregateFunction.valueOf(functionName.toUpperCase()); - } catch (IllegalArgumentException e) { - return false; - } - return true; - } - - public static SeriesAggregateFunction getFunction(String functionName) throws Function.FunctionFormatException { - try { - return SeriesAggregateFunction.valueOf(functionName.toUpperCase()); - } catch (NullPointerException | IllegalArgumentException e) { - throw new Function.FunctionFormatException( - "Function should be sum, avg, min, max. Got " + functionName, e); - } - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/function/TimelineMetricsSeriesAggregateFunction.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/function/TimelineMetricsSeriesAggregateFunction.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/function/TimelineMetricsSeriesAggregateFunction.java deleted file mode 100644 index bdb5fe5..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/function/TimelineMetricsSeriesAggregateFunction.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function; - -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; - -public interface TimelineMetricsSeriesAggregateFunction { - TimelineMetric apply(TimelineMetrics timelineMetrics); -} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/function/TimelineMetricsSeriesAggregateFunctionFactory.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/function/TimelineMetricsSeriesAggregateFunctionFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/function/TimelineMetricsSeriesAggregateFunctionFactory.java deleted file mode 100644 index 63a0fdc..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/function/TimelineMetricsSeriesAggregateFunctionFactory.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function; - -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function; - -public class TimelineMetricsSeriesAggregateFunctionFactory { - private TimelineMetricsSeriesAggregateFunctionFactory() { - } - - public static TimelineMetricsSeriesAggregateFunction newInstance(SeriesAggregateFunction func) { - switch (func) { - case AVG: - return new TimelineMetricsSeriesAvgAggregateFunction(); - case MIN: - return new TimelineMetricsSeriesMinAggregateFunction(); - case MAX: - return new TimelineMetricsSeriesMaxAggregateFunction(); - case SUM: - return new TimelineMetricsSeriesSumAggregateFunction(); - default: - throw new Function.FunctionFormatException("Function should be sum, avg, min, max. Got " + - func.name()); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/function/TimelineMetricsSeriesAvgAggregateFunction.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/function/TimelineMetricsSeriesAvgAggregateFunction.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/function/TimelineMetricsSeriesAvgAggregateFunction.java deleted file mode 100644 index f7c66ed..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/function/TimelineMetricsSeriesAvgAggregateFunction.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function; - -import java.util.List; - -public class TimelineMetricsSeriesAvgAggregateFunction extends AbstractTimelineMetricsSeriesAggregateFunction { - private static final String FUNCTION_NAME = "AVG"; - - @Override - protected Double applyFunction(List<Double> values) { - double sum = 0.0d; - for (Double value : values) { - sum += value; - } - - return sum / values.size(); - } - - @Override - protected String getFunctionName() { - return FUNCTION_NAME; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/function/TimelineMetricsSeriesMaxAggregateFunction.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/function/TimelineMetricsSeriesMaxAggregateFunction.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/function/TimelineMetricsSeriesMaxAggregateFunction.java deleted file mode 100644 index 0b79f78..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/function/TimelineMetricsSeriesMaxAggregateFunction.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function; - -import java.util.List; - -public class TimelineMetricsSeriesMaxAggregateFunction extends AbstractTimelineMetricsSeriesAggregateFunction { - private static final String FUNCTION_NAME = "MAX"; - - @Override - protected Double applyFunction(List<Double> values) { - double max = Double.MIN_VALUE; - for (Double value : values) { - if (value > max) { - max = value; - } - } - - return max; - } - - @Override - protected String getFunctionName() { - return FUNCTION_NAME; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/function/TimelineMetricsSeriesMinAggregateFunction.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/function/TimelineMetricsSeriesMinAggregateFunction.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/function/TimelineMetricsSeriesMinAggregateFunction.java deleted file mode 100644 index 7146aa2..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/function/TimelineMetricsSeriesMinAggregateFunction.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function; - -import java.util.List; - -public class TimelineMetricsSeriesMinAggregateFunction extends AbstractTimelineMetricsSeriesAggregateFunction { - private static final String FUNCTION_NAME = "MIN"; - - @Override - protected Double applyFunction(List<Double> values) { - double min = Double.MAX_VALUE; - for (Double value : values) { - if (value < min) { - min = value; - } - } - - return min; - } - - @Override - protected String getFunctionName() { - return FUNCTION_NAME; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/function/TimelineMetricsSeriesSumAggregateFunction.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/function/TimelineMetricsSeriesSumAggregateFunction.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/function/TimelineMetricsSeriesSumAggregateFunction.java deleted file mode 100644 index 2a15c95..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/function/TimelineMetricsSeriesSumAggregateFunction.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function; - -import java.util.List; - -public class TimelineMetricsSeriesSumAggregateFunction extends AbstractTimelineMetricsSeriesAggregateFunction { - private static final String FUNCTION_NAME = "SUM"; - - @Override - protected Double applyFunction(List<Double> values) { - double sum = 0.0d; - for (Double value : values) { - sum += value; - } - - return sum; - } - - @Override - protected String getFunctionName() { - return FUNCTION_NAME; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/Condition.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/Condition.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/Condition.java deleted file mode 100644 index 8d8cca3..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/Condition.java +++ /dev/null @@ -1,51 +0,0 @@ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query; - -import java.util.List; - -import org.apache.hadoop.metrics2.sink.timeline.Precision; - -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -public interface Condition { - boolean isEmpty(); - - List<byte[]> getUuids(); - List<String> getMetricNames(); - boolean isPointInTime(); - boolean isGrouped(); - void setStatement(String statement); - List<String> getHostnames(); - Precision getPrecision(); - void setPrecision(Precision precision); - String getAppId(); - String getInstanceId(); - StringBuilder getConditionClause(); - String getOrderByClause(boolean asc); - String getStatement(); - Long getStartTime(); - Long getEndTime(); - Integer getLimit(); - Integer getFetchSize(); - void setFetchSize(Integer fetchSize); - void addOrderByColumn(String column); - void setNoLimit(); - boolean doUpdate(); - void setMetricNamesNotCondition(boolean metricNamesNotCondition); - void setHostnamesNotCondition(boolean hostNamesNotCondition); - void setUuidNotCondition(boolean uuidNotCondition); -} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConditionBuilder.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConditionBuilder.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConditionBuilder.java deleted file mode 100644 index f330b60..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConditionBuilder.java +++ /dev/null @@ -1,143 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query; - -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Set; - -import org.apache.hadoop.metrics2.sink.timeline.Precision; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function; - -public class ConditionBuilder { - - private List<String> metricNames; - private List<String> hostnames; - private String appId; - private String instanceId; - private Long startTime; - private Long endTime; - private Precision precision; - private Integer limit; - private boolean grouped; - private boolean noLimit = false; - private Integer fetchSize; - private String statement; - private Set<String> orderByColumns = new LinkedHashSet<String>(); - private Integer topN; - private boolean isBottomN; - private Function topNFunction; - private List<byte[]> uuids; - - public ConditionBuilder(List<String> metricNames) { - this.metricNames = metricNames; - } - - public ConditionBuilder hostnames(List<String> hostnames) { - this.hostnames = hostnames; - return this; - } - - public ConditionBuilder appId(String appId) { - this.appId = appId; - return this; - } - - public ConditionBuilder instanceId(String instanceId) { - this.instanceId = instanceId; - return this; - } - - public ConditionBuilder startTime(Long startTime) { - this.startTime = startTime; - return this; - } - - public ConditionBuilder endTime(Long endTime) { - this.endTime = endTime; - return this; - } - - public ConditionBuilder precision(Precision precision) { - this.precision = precision; - return this; - } - - public ConditionBuilder limit(Integer limit) { - this.limit = limit; - return this; - } - - public ConditionBuilder grouped(boolean grouped) { - this.grouped = grouped; - return this; - } - - public ConditionBuilder noLimit(boolean noLimit) { - this.noLimit = noLimit; - return this; - } - - public ConditionBuilder fetchSize(Integer fetchSize) { - this.fetchSize = fetchSize; - return this; - } - - public ConditionBuilder statement(String statement) { - this.statement = statement; - return this; - } - - public ConditionBuilder orderByColumns(Set<String> orderByColumns) { - this.orderByColumns = orderByColumns; - return this; - } - - public ConditionBuilder topN(Integer topN) { - this.topN = topN; - return this; - } - - public ConditionBuilder isBottomN(boolean isBottomN) { - this.isBottomN = isBottomN; - return this; - } - - public ConditionBuilder topNFunction(Function topNFunction) { - this.topNFunction = topNFunction; - return this; - } - - public ConditionBuilder uuid(List<byte[]> uuids) { - this.uuids = uuids; - return this; - } - - public Condition build() { - if (topN == null) { - return new DefaultCondition( - uuids, metricNames, - hostnames, appId, instanceId, startTime, endTime, - precision, limit, grouped); - } else { - return new TopNCondition(uuids, metricNames, hostnames, appId, instanceId, - startTime, endTime, precision, limit, grouped, topN, topNFunction, isBottomN); - } - } - -}
