http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java index 8a71756..e00c045 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -29,6 +30,9 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.uuid.HashBasedUuidGenStrategy; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.uuid.MetricUuidGenStrategy; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.uuid.RandomUuidGenStrategy; import java.net.MalformedURLException; import java.net.URISyntaxException; @@ -48,6 +52,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DISABLE_METRIC_METADATA_MGMT; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.METRICS_METADATA_SYNC_INIT_DELAY; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.METRICS_METADATA_SYNC_SCHEDULE_DELAY; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_UUID_GEN_STRATEGY; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_METADATA_FILTERS; public class TimelineMetricMetadataManager { @@ -55,12 +60,17 @@ public class TimelineMetricMetadataManager { private boolean isDisabled = false; // Cache all metadata on retrieval private final Map<TimelineMetricMetadataKey, TimelineMetricMetadata> METADATA_CACHE = new ConcurrentHashMap<>(); + private final Map<String, TimelineMetricMetadataKey> uuidKeyMap = new ConcurrentHashMap<>(); // Map to lookup apps on a host - private final Map<String, Set<String>> HOSTED_APPS_MAP = new ConcurrentHashMap<>(); + private final Map<String, TimelineMetricHostMetadata> HOSTED_APPS_MAP = new ConcurrentHashMap<>(); + private final Map<String, String> uuidHostMap = new ConcurrentHashMap<>(); private final Map<String, Set<String>> INSTANCE_HOST_MAP = new ConcurrentHashMap<>(); // Sync only when needed AtomicBoolean SYNC_HOSTED_APPS_METADATA = new AtomicBoolean(false); AtomicBoolean SYNC_HOSTED_INSTANCES_METADATA = new AtomicBoolean(false); + private MetricUuidGenStrategy uuidGenStrategy = new HashBasedUuidGenStrategy(); + private static final int timelineMetricUuidLength = 16; + private static final int hostnameUuidLength = 4; // Single thread to sync back new writes to the store private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); @@ -81,6 +91,8 @@ public class TimelineMetricMetadataManager { if (!StringUtils.isEmpty(patternStrings)) { metricNameFilters.addAll(Arrays.asList(patternStrings.split(","))); } + + uuidGenStrategy = getUuidStrategy(metricsConf); } public TimelineMetricMetadataManager(PhoenixHBaseAccessor hBaseAccessor) throws MalformedURLException, URISyntaxException { @@ -108,11 +120,14 @@ public class TimelineMetricMetadataManager { // Store in the cache METADATA_CACHE.putAll(metadata); - Map<String, Set<String>> hostedAppData = getHostedAppsFromStore(); + Map<String, TimelineMetricHostMetadata> hostedAppData = getHostedAppsFromStore(); LOG.info("Retrieved " + hostedAppData.size() + " host objects from store."); HOSTED_APPS_MAP.putAll(hostedAppData); + loadUuidMapsOnInit(); + + hBaseAccessor.setMetadataInstance(this); } catch (SQLException e) { LOG.warn("Exception loading metric metadata", e); } @@ -127,7 +142,7 @@ public class TimelineMetricMetadataManager { return METADATA_CACHE.get(key); } - public Map<String, Set<String>> getHostedAppsCache() { + public Map<String, TimelineMetricHostMetadata> getHostedAppsCache() { return HOSTED_APPS_MAP; } @@ -172,7 +187,7 @@ public class TimelineMetricMetadataManager { } TimelineMetricMetadataKey key = new TimelineMetricMetadataKey( - metadata.getMetricName(), metadata.getAppId()); + metadata.getMetricName(), metadata.getAppId(), metadata.getInstanceId()); TimelineMetricMetadata metadataFromCache = METADATA_CACHE.get(key); @@ -197,10 +212,15 @@ public class TimelineMetricMetadataManager { * @param appId Application Id */ public void putIfModifiedHostedAppsMetadata(String hostname, String appId) { - Set<String> apps = HOSTED_APPS_MAP.get(hostname); + TimelineMetricHostMetadata timelineMetricHostMetadata = HOSTED_APPS_MAP.get(hostname); + Set<String> apps = (timelineMetricHostMetadata != null) ? timelineMetricHostMetadata.getHostedApps() : null; if (apps == null) { apps = new HashSet<>(); - HOSTED_APPS_MAP.put(hostname, apps); + if (timelineMetricHostMetadata == null) { + HOSTED_APPS_MAP.put(hostname, new TimelineMetricHostMetadata(apps)); + } else { + HOSTED_APPS_MAP.get(hostname).setHostedApps(apps); + } } if (!apps.contains(appId)) { @@ -230,7 +250,7 @@ public class TimelineMetricMetadataManager { hBaseAccessor.saveMetricMetadata(metadata); } - public void persistHostedAppsMetadata(Map<String, Set<String>> hostedApps) throws SQLException { + public void persistHostedAppsMetadata(Map<String, TimelineMetricHostMetadata> hostedApps) throws SQLException { hBaseAccessor.saveHostAppsMetadata(hostedApps); } @@ -242,6 +262,7 @@ public class TimelineMetricMetadataManager { return new TimelineMetricMetadata( timelineMetric.getMetricName(), timelineMetric.getAppId(), + timelineMetric.getInstanceId(), timelineMetric.getUnits(), timelineMetric.getType(), timelineMetric.getStartTime(), @@ -255,7 +276,7 @@ public class TimelineMetricMetadataManager { } boolean isDistributedModeEnabled() { - return metricsConf.get("timeline.metrics.service.operation.mode", "").equals("distributed"); + return metricsConf.get("timeline.metrics.service.operation.mode").equals("distributed"); } /** @@ -270,7 +291,7 @@ public class TimelineMetricMetadataManager { * Fetch hosted apps from store * @throws SQLException */ - Map<String, Set<String>> getHostedAppsFromStore() throws SQLException { + Map<String, TimelineMetricHostMetadata> getHostedAppsFromStore() throws SQLException { return hBaseAccessor.getHostedAppsMetadata(); } @@ -282,4 +303,255 @@ public class TimelineMetricMetadataManager { return MapUtils.isEmpty(metric.getMetadata()) || !(String.valueOf(true).equals(metric.getMetadata().get("skipAggregation"))); } + + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // UUID Management + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + + /** + * Load the UUID mappings from the UUID table on startup. + */ + private void loadUuidMapsOnInit() { + + for (TimelineMetricMetadataKey key : METADATA_CACHE.keySet()) { + TimelineMetricMetadata timelineMetricMetadata = METADATA_CACHE.get(key); + if (timelineMetricMetadata != null && timelineMetricMetadata.getUuid() != null) { + uuidKeyMap.put(new String(timelineMetricMetadata.getUuid()), key); + } + } + + for (String host : HOSTED_APPS_MAP.keySet()) { + TimelineMetricHostMetadata timelineMetricHostMetadata = HOSTED_APPS_MAP.get(host); + if (timelineMetricHostMetadata != null && timelineMetricHostMetadata.getUuid() != null) { + uuidHostMap.put(new String(timelineMetricHostMetadata.getUuid()), host); + } + } + } + + /** + * Returns the UUID gen strategy. + * @param configuration + * @return + */ + private MetricUuidGenStrategy getUuidStrategy(Configuration configuration) { + String strategy = configuration.get(TIMELINE_METRICS_UUID_GEN_STRATEGY, ""); + if ("random".equalsIgnoreCase(strategy)) { + return new RandomUuidGenStrategy(); + } else { + return new HashBasedUuidGenStrategy(); + } + } + + /** + * Given the hostname, generates a byte array of length 'hostnameUuidLength' + * @param hostname + * @return uuid byte array of length 'hostnameUuidLength' + */ + private byte[] getUuidForHostname(String hostname) { + + TimelineMetricHostMetadata timelineMetricHostMetadata = HOSTED_APPS_MAP.get(hostname); + if (timelineMetricHostMetadata != null) { + byte[] uuid = timelineMetricHostMetadata.getUuid(); + if (uuid != null) { + return uuid; + } + } + + byte[] uuid = uuidGenStrategy.computeUuid(hostname, hostnameUuidLength); + + String uuidStr = new String(uuid); + if (uuidHostMap.containsKey(uuidStr)) { + LOG.error("Duplicate key computed for " + hostname +", Collides with " + uuidHostMap.get(uuidStr)); + return null; + } + + if (timelineMetricHostMetadata == null) { + timelineMetricHostMetadata = new TimelineMetricHostMetadata(); + HOSTED_APPS_MAP.put(hostname, timelineMetricHostMetadata); + } + timelineMetricHostMetadata.setUuid(uuid); + uuidHostMap.put(uuidStr, hostname); + + return uuid; + } + + /** + * Given a timelineClusterMetric instance, generates a UUID for Metric-App-Instance combination. + * @param timelineClusterMetric + * @return uuid byte array of length 'timelineMetricUuidLength' + */ + public byte[] getUuid(TimelineClusterMetric timelineClusterMetric) { + TimelineMetricMetadataKey key = new TimelineMetricMetadataKey(timelineClusterMetric.getMetricName(), + timelineClusterMetric.getAppId(), timelineClusterMetric.getInstanceId()); + + TimelineMetricMetadata timelineMetricMetadata = METADATA_CACHE.get(key); + if (timelineMetricMetadata != null) { + byte[] uuid = timelineMetricMetadata.getUuid(); + if (uuid != null) { + return uuid; + } + } + + byte[] uuid = uuidGenStrategy.computeUuid(timelineClusterMetric, timelineMetricUuidLength); + + String uuidStr = new String(uuid); + if (uuidKeyMap.containsKey(uuidStr) && !uuidKeyMap.get(uuidStr).equals(key)) { + TimelineMetricMetadataKey collidingKey = (TimelineMetricMetadataKey)uuidKeyMap.get(uuidStr); + LOG.error("Duplicate key " + Arrays.toString(uuid) + "(" + uuid + ") computed for " + timelineClusterMetric.toString() + ", Collides with " + collidingKey.toString()); + return null; + } + + if (timelineMetricMetadata == null) { + timelineMetricMetadata = new TimelineMetricMetadata(); + timelineMetricMetadata.setMetricName(timelineClusterMetric.getMetricName()); + timelineMetricMetadata.setAppId(timelineClusterMetric.getAppId()); + timelineMetricMetadata.setInstanceId(timelineClusterMetric.getInstanceId()); + METADATA_CACHE.put(key, timelineMetricMetadata); + } + + timelineMetricMetadata.setUuid(uuid); + timelineMetricMetadata.setIsPersisted(false); + uuidKeyMap.put(uuidStr, key); + return uuid; + } + + /** + * Given a timelineMetric instance, generates a UUID for Metric-App-Instance combination. + * @param timelineMetric + * @return uuid byte array of length 'timelineMetricUuidLength' + 'hostnameUuidLength' + */ + public byte[] getUuid(TimelineMetric timelineMetric) { + + byte[] metricUuid = getUuid(new TimelineClusterMetric(timelineMetric.getMetricName(), timelineMetric.getAppId(), + timelineMetric.getInstanceId(), -1l)); + byte[] hostUuid = getUuidForHostname(timelineMetric.getHostName()); + + return ArrayUtils.addAll(metricUuid, hostUuid); + } + + public String getMetricNameFromUuid(byte[] uuid) { + + byte[] metricUuid = uuid; + if (uuid.length == timelineMetricUuidLength + hostnameUuidLength) { + metricUuid = ArrayUtils.subarray(uuid, 0, timelineMetricUuidLength); + } + + TimelineMetricMetadataKey key = uuidKeyMap.get(new String(metricUuid)); + return key != null ? key.getMetricName() : null; + } + + public TimelineMetric getMetricFromUuid(byte[] uuid) { + if (uuid == null) { + return null; + } + + if (uuid.length == timelineMetricUuidLength) { + TimelineMetricMetadataKey key = uuidKeyMap.get(new String(uuid)); + return key != null ? new TimelineMetric(key.metricName, null, key.appId, key.instanceId) : null; + } else { + byte[] metricUuid = ArrayUtils.subarray(uuid, 0, timelineMetricUuidLength); + TimelineMetricMetadataKey key = uuidKeyMap.get(new String(metricUuid)); + if (key == null) { + LOG.error("TimelineMetricMetadataKey is null for : " + Arrays.toString(uuid)); + return null; + } + TimelineMetric timelineMetric = new TimelineMetric(); + timelineMetric.setMetricName(key.metricName); + timelineMetric.setAppId(key.appId); + timelineMetric.setInstanceId(key.instanceId); + + byte[] hostUuid = ArrayUtils.subarray(uuid, timelineMetricUuidLength, hostnameUuidLength + timelineMetricUuidLength); + timelineMetric.setHostName(uuidHostMap.get(new String(hostUuid))); + return timelineMetric; + } + } + + /** + * Returns the set of UUIDs for a given GET request. If there are wildcards (%), resolves them based on UUID map. + * @param metricNames + * @param hostnames + * @param appId + * @param instanceId + * @return Set of UUIds + */ + public List<byte[]> getUuids(Collection<String> metricNames, List<String> hostnames, String appId, String instanceId) { + + Collection<String> sanitizedMetricNames = new HashSet<>(); + + for (String metricName : metricNames) { + if (metricName.contains("%")) { + String metricRegEx; + //Special case handling for metric name with * and __%. + //For example, dfs.NNTopUserOpCounts.windowMs=300000.op=*.user=%.count + // or dfs.NNTopUserOpCounts.windowMs=300000.op=__%.user=%.count + if (metricName.contains("*") || metricName.contains("__%")) { + String metricNameWithEscSeq = metricName.replace("*", "\\*").replace("__%", "..%"); + metricRegEx = metricNameWithEscSeq.replace("%", ".*"); + } else { + metricRegEx = metricName.replace("%", ".*"); + } + for (TimelineMetricMetadataKey key : METADATA_CACHE.keySet()) { + String metricNameFromMetadata = key.getMetricName(); + if (metricNameFromMetadata.matches(metricRegEx)) { + sanitizedMetricNames.add(metricNameFromMetadata); + } + } + } else { + sanitizedMetricNames.add(metricName); + } + } + + Set<String> sanitizedHostNames = new HashSet<>(); + if (CollectionUtils.isNotEmpty(hostnames)) { + for (String hostname : hostnames) { + if (hostname.contains("%")) { + String hostRegEx; + hostRegEx = hostname.replace("%", ".*"); + for (String host : HOSTED_APPS_MAP.keySet()) { + if (host.matches(hostRegEx)) { + sanitizedHostNames.add(host); + } + } + } else { + sanitizedHostNames.add(hostname); + } + } + } + + List<byte[]> uuids = new ArrayList<>(); + + if (!(appId.equals("HOST") || appId.equals("FLUME_HANDLER"))) { //HACK.. Why?? + appId = appId.toLowerCase(); + } + if (CollectionUtils.isNotEmpty(sanitizedHostNames)) { + for (String metricName : sanitizedMetricNames) { + TimelineMetric metric = new TimelineMetric(); + metric.setMetricName(metricName); + metric.setAppId(appId); + metric.setInstanceId(instanceId); + for (String hostname : sanitizedHostNames) { + metric.setHostName(hostname); + byte[] uuid = getUuid(metric); + if (uuid != null) { + uuids.add(uuid); + } + } + } + } else { + for (String metricName : sanitizedMetricNames) { + TimelineClusterMetric metric = new TimelineClusterMetric(metricName, appId, instanceId, -1l); + byte[] uuid = getUuid(metric); + if (uuid != null) { + uuids.add(uuid); + } + } + } + + return uuids; + } + + public Map<String, TimelineMetricMetadataKey> getUuidKeyMap() { + return uuidKeyMap; + } }
http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataSync.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataSync.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataSync.java index 6d519f6..f808cd7 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataSync.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataSync.java @@ -81,7 +81,7 @@ public class TimelineMetricMetadataSync implements Runnable { if (markSuccess) { for (TimelineMetricMetadata metadata : metadataToPersist) { TimelineMetricMetadataKey key = new TimelineMetricMetadataKey( - metadata.getMetricName(), metadata.getAppId() + metadata.getMetricName(), metadata.getAppId(), metadata.getInstanceId() ); // Mark entry as being persisted @@ -119,7 +119,7 @@ public class TimelineMetricMetadataSync implements Runnable { */ private void persistHostAppsMetadata() { if (cacheManager.syncHostedAppsMetadata()) { - Map<String, Set<String>> persistedData = null; + Map<String, TimelineMetricHostMetadata> persistedData = null; try { persistedData = cacheManager.getHostedAppsFromStore(); } catch (SQLException e) { @@ -127,14 +127,14 @@ public class TimelineMetricMetadataSync implements Runnable { return; // Something wrong with store } - Map<String, Set<String>> cachedData = cacheManager.getHostedAppsCache(); - Map<String, Set<String>> dataToSync = new HashMap<>(); + Map<String, TimelineMetricHostMetadata> cachedData = cacheManager.getHostedAppsCache(); + Map<String, TimelineMetricHostMetadata> dataToSync = new HashMap<>(); if (cachedData != null && !cachedData.isEmpty()) { - for (Map.Entry<String, Set<String>> cacheEntry : cachedData.entrySet()) { + for (Map.Entry<String, TimelineMetricHostMetadata> cacheEntry : cachedData.entrySet()) { // No persistence / stale data in store if (persistedData == null || persistedData.isEmpty() || !persistedData.containsKey(cacheEntry.getKey()) || - !persistedData.get(cacheEntry.getKey()).containsAll(cacheEntry.getValue())) { + !persistedData.get(cacheEntry.getKey()).getHostedApps().containsAll(cacheEntry.getValue().getHostedApps())) { dataToSync.put(cacheEntry.getKey(), cacheEntry.getValue()); } } @@ -189,16 +189,16 @@ public class TimelineMetricMetadataSync implements Runnable { * Read all hosted apps metadata and update cached values - HA */ private void refreshHostAppsMetadata() { - Map<String, Set<String>> hostedAppsDataFromStore = null; + Map<String, TimelineMetricHostMetadata> hostedAppsDataFromStore = null; try { hostedAppsDataFromStore = cacheManager.getHostedAppsFromStore(); } catch (SQLException e) { LOG.warn("Error refreshing metadata from store.", e); } if (hostedAppsDataFromStore != null) { - Map<String, Set<String>> cachedData = cacheManager.getHostedAppsCache(); + Map<String, TimelineMetricHostMetadata> cachedData = cacheManager.getHostedAppsCache(); - for (Map.Entry<String, Set<String>> storeEntry : hostedAppsDataFromStore.entrySet()) { + for (Map.Entry<String, TimelineMetricHostMetadata> storeEntry : hostedAppsDataFromStore.entrySet()) { if (!cachedData.containsKey(storeEntry.getKey())) { cachedData.put(storeEntry.getKey(), storeEntry.getValue()); } http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/Condition.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/Condition.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/Condition.java index 9aa64bd..9714e1a 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/Condition.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/Condition.java @@ -24,6 +24,7 @@ import java.util.List; public interface Condition { boolean isEmpty(); + List<byte[]> getUuids(); List<String> getMetricNames(); boolean isPointInTime(); boolean isGrouped(); http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConditionBuilder.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConditionBuilder.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConditionBuilder.java index 32c1e84..f395c3e 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConditionBuilder.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConditionBuilder.java @@ -42,6 +42,7 @@ public class ConditionBuilder { private Integer topN; private boolean isBottomN; private Function topNFunction; + private List<byte[]> uuids; public ConditionBuilder(List<String> metricNames) { this.metricNames = metricNames; @@ -122,14 +123,19 @@ public class ConditionBuilder { return this; } + public ConditionBuilder uuid(List<byte[]> uuids) { + this.uuids = uuids; + return this; + } + public Condition build() { if (topN == null) { return new DefaultCondition( - metricNames, + uuids, metricNames, hostnames, appId, instanceId, startTime, endTime, precision, limit, grouped); } else { - return new TopNCondition(metricNames, hostnames, appId, instanceId, + return new TopNCondition(uuids, metricNames, hostnames, appId, instanceId, startTime, endTime, precision, limit, grouped, topN, topNFunction, isBottomN); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultCondition.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultCondition.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultCondition.java index a4f7014..3c03dca 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultCondition.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultCondition.java @@ -43,6 +43,7 @@ public class DefaultCondition implements Condition { String statement; Set<String> orderByColumns = new LinkedHashSet<String>(); boolean metricNamesNotCondition = false; + List<byte[]> uuids = new ArrayList<>(); private static final Log LOG = LogFactory.getLog(DefaultCondition.class); @@ -60,6 +61,21 @@ public class DefaultCondition implements Condition { this.grouped = grouped; } + public DefaultCondition(List<byte[]> uuids, List<String> metricNames, List<String> hostnames, String appId, + String instanceId, Long startTime, Long endTime, Precision precision, + Integer limit, boolean grouped) { + this.uuids = uuids; + this.metricNames = metricNames; + this.hostnames = hostnames; + this.appId = appId; + this.instanceId = instanceId; + this.startTime = startTime; + this.endTime = endTime; + this.precision = precision; + this.limit = limit; + this.grouped = grouped; + } + public String getStatement() { return statement; } @@ -74,13 +90,7 @@ public class DefaultCondition implements Condition { public StringBuilder getConditionClause() { StringBuilder sb = new StringBuilder(); - - boolean appendConjunction = appendMetricNameClause(sb); - - appendConjunction = appendHostnameClause(sb, appendConjunction); - - appendConjunction = append(sb, appendConjunction, getAppId(), " APP_ID = ?"); - appendConjunction = append(sb, appendConjunction, getInstanceId(), " INSTANCE_ID = ?"); + boolean appendConjunction = appendUuidClause(sb); appendConjunction = append(sb, appendConjunction, getStartTime(), " SERVER_TIME >= ?"); append(sb, appendConjunction, getEndTime(), " SERVER_TIME < ?"); @@ -216,6 +226,37 @@ public class DefaultCondition implements Condition { return null; } + protected boolean appendUuidClause(StringBuilder sb) { + boolean appendConjunction = false; + + if (CollectionUtils.isNotEmpty(uuids)) { + // Put a '(' first + sb.append("("); + + //IN clause + // UUID (NOT) IN (?,?,?,?) + if (CollectionUtils.isNotEmpty(uuids)) { + sb.append("UUID"); + if (metricNamesNotCondition) { + sb.append(" NOT"); + } + sb.append(" IN ("); + //Append ?,?,?,? + for (int i = 0; i < uuids.size(); i++) { + sb.append("?"); + if (i < uuids.size() - 1) { + sb.append(", "); + } + } + sb.append(")"); + } + appendConjunction = true; + sb.append(")"); + } + + return appendConjunction; + } + protected boolean appendMetricNameClause(StringBuilder sb) { boolean appendConjunction = false; List<String> metricsLike = new ArrayList<>(); @@ -381,4 +422,9 @@ public class DefaultCondition implements Condition { public void setMetricNamesNotCondition(boolean metricNamesNotCondition) { this.metricNamesNotCondition = metricNamesNotCondition; } + + @Override + public List<byte[]> getUuids() { + return uuids; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/EmptyCondition.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/EmptyCondition.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/EmptyCondition.java index 43ab88c..b667df3 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/EmptyCondition.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/EmptyCondition.java @@ -35,6 +35,11 @@ public class EmptyCondition implements Condition { } @Override + public List<byte[]> getUuids() { + return null; + } + + @Override public List<String> getMetricNames() { return null; } http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java index d39230d..51c96c6 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java @@ -40,20 +40,15 @@ public class PhoenixTransactSQL { * Create table to store individual metric records. */ public static final String CREATE_METRICS_TABLE_SQL = "CREATE TABLE IF NOT " + - "EXISTS METRIC_RECORD (METRIC_NAME VARCHAR, " + - "HOSTNAME VARCHAR, " + - "SERVER_TIME UNSIGNED_LONG NOT NULL, " + - "APP_ID VARCHAR, " + - "INSTANCE_ID VARCHAR, " + + "EXISTS METRIC_RECORD (UUID BINARY(20) NOT NULL, " + + "SERVER_TIME BIGINT NOT NULL, " + "START_TIME UNSIGNED_LONG, " + - "UNITS CHAR(20), " + "METRIC_SUM DOUBLE, " + "METRIC_COUNT UNSIGNED_INT, " + "METRIC_MAX DOUBLE, " + "METRIC_MIN DOUBLE, " + "METRICS VARCHAR CONSTRAINT pk " + - "PRIMARY KEY (METRIC_NAME, HOSTNAME, SERVER_TIME, APP_ID, " + - "INSTANCE_ID)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " + + "PRIMARY KEY (UUID, SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " + "TTL=%s, COMPRESSION='%s'"; public static final String CREATE_CONTAINER_METRICS_TABLE_SQL = @@ -85,55 +80,44 @@ public class PhoenixTransactSQL { public static final String CREATE_METRICS_AGGREGATE_TABLE_SQL = "CREATE TABLE IF NOT EXISTS %s " + - "(METRIC_NAME VARCHAR, " + - "HOSTNAME VARCHAR, " + - "APP_ID VARCHAR, " + - "INSTANCE_ID VARCHAR, " + + "(UUID BINARY(20) NOT NULL, " + "SERVER_TIME UNSIGNED_LONG NOT NULL, " + - "UNITS CHAR(20), " + "METRIC_SUM DOUBLE," + "METRIC_COUNT UNSIGNED_INT, " + "METRIC_MAX DOUBLE," + "METRIC_MIN DOUBLE CONSTRAINT pk " + - "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " + - "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, TTL=%s," + + "PRIMARY KEY (UUID, SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, TTL=%s," + " COMPRESSION='%s'"; public static final String CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL = "CREATE TABLE IF NOT EXISTS %s " + - "(METRIC_NAME VARCHAR, " + - "APP_ID VARCHAR, " + - "INSTANCE_ID VARCHAR, " + + "(UUID BINARY(16) NOT NULL, " + "SERVER_TIME UNSIGNED_LONG NOT NULL, " + - "UNITS CHAR(20), " + "METRIC_SUM DOUBLE, " + "HOSTS_COUNT UNSIGNED_INT, " + "METRIC_MAX DOUBLE, " + "METRIC_MIN DOUBLE " + - "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " + - "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " + + "CONSTRAINT pk PRIMARY KEY (UUID, SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " + "TTL=%s, COMPRESSION='%s'"; // HOSTS_COUNT vs METRIC_COUNT public static final String CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL = "CREATE TABLE IF NOT EXISTS %s " + - "(METRIC_NAME VARCHAR, " + - "APP_ID VARCHAR, " + - "INSTANCE_ID VARCHAR, " + + "(UUID BINARY(16) NOT NULL, " + "SERVER_TIME UNSIGNED_LONG NOT NULL, " + - "UNITS CHAR(20), " + "METRIC_SUM DOUBLE, " + "METRIC_COUNT UNSIGNED_INT, " + "METRIC_MAX DOUBLE, " + "METRIC_MIN DOUBLE " + - "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " + - "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " + + "CONSTRAINT pk PRIMARY KEY (UUID, SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " + "TTL=%s, COMPRESSION='%s'"; public static final String CREATE_METRICS_METADATA_TABLE_SQL = "CREATE TABLE IF NOT EXISTS METRICS_METADATA " + "(METRIC_NAME VARCHAR, " + "APP_ID VARCHAR, " + + "INSTANCE_ID VARCHAR, " + + "UUID BINARY(16), " + "UNITS CHAR(20), " + "TYPE CHAR(20), " + "START_TIME UNSIGNED_LONG, " + @@ -144,7 +128,7 @@ public class PhoenixTransactSQL { public static final String CREATE_HOSTED_APPS_METADATA_TABLE_SQL = "CREATE TABLE IF NOT EXISTS HOSTED_APPS_METADATA " + - "(HOSTNAME VARCHAR, APP_IDS VARCHAR, " + + "(HOSTNAME VARCHAR, UUID BINARY(4), APP_IDS VARCHAR, " + "CONSTRAINT pk PRIMARY KEY (HOSTNAME))" + "DATA_BLOCK_ENCODING='%s', COMPRESSION='%s'"; @@ -166,14 +150,15 @@ public class PhoenixTransactSQL { * Insert into metric records table. */ public static final String UPSERT_METRICS_SQL = "UPSERT INTO %s " + - "(METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, START_TIME, " + - "UNITS, " + + "(UUID, " + + "SERVER_TIME, " + + "START_TIME, " + "METRIC_SUM, " + "METRIC_MAX, " + "METRIC_MIN, " + "METRIC_COUNT, " + "METRICS) VALUES " + - "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + "(?, ?, ?, ?, ?, ?, ?, ?)"; public static final String UPSERT_CONTAINER_METRICS_SQL = "UPSERT INTO %s " + "(APP_ID," @@ -201,40 +186,40 @@ public class PhoenixTransactSQL { "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; public static final String UPSERT_CLUSTER_AGGREGATE_SQL = "UPSERT INTO " + - "%s (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, " + - "UNITS, " + + "%s (UUID, " + + "SERVER_TIME, " + "METRIC_SUM, " + "HOSTS_COUNT, " + "METRIC_MAX, " + "METRIC_MIN) " + - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; + "VALUES (?, ?, ?, ?, ?, ?)"; public static final String UPSERT_CLUSTER_AGGREGATE_TIME_SQL = "UPSERT INTO" + - " %s (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, " + + " %s (UUID, SERVER_TIME, " + "UNITS, " + "METRIC_SUM, " + "METRIC_COUNT, " + "METRIC_MAX, " + "METRIC_MIN) " + - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; + "VALUES (?, ?, ?, ?, ?, ?)"; public static final String UPSERT_AGGREGATE_RECORD_SQL = "UPSERT INTO " + - "%s (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " + + "%s (UUID, " + "SERVER_TIME, " + "UNITS, " + "METRIC_SUM, " + "METRIC_MAX, " + "METRIC_MIN," + "METRIC_COUNT) " + - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + "VALUES (?, ?, ?, ?, ?, ?)"; public static final String UPSERT_METADATA_SQL = - "UPSERT INTO METRICS_METADATA (METRIC_NAME, APP_ID, UNITS, TYPE, " + + "UPSERT INTO METRICS_METADATA (METRIC_NAME, APP_ID, INSTANCE_ID, UUID, UNITS, TYPE, " + "START_TIME, SUPPORTS_AGGREGATION, IS_WHITELISTED) " + - "VALUES (?, ?, ?, ?, ?, ?, ?)"; + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; public static final String UPSERT_HOSTED_APPS_METADATA_SQL = - "UPSERT INTO HOSTED_APPS_METADATA (HOSTNAME, APP_IDS) VALUES (?, ?)"; + "UPSERT INTO HOSTED_APPS_METADATA (HOSTNAME, UUID, APP_IDS) VALUES (?, ?, ?)"; public static final String UPSERT_INSTANCE_HOST_METADATA_SQL = "UPSERT INTO INSTANCE_HOST_METADATA (INSTANCE_ID, HOSTNAME) VALUES (?, ?)"; @@ -242,8 +227,7 @@ public class PhoenixTransactSQL { /** * Retrieve a set of rows from metrics records table. */ - public static final String GET_METRIC_SQL = "SELECT %s METRIC_NAME, " + - "HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, START_TIME, UNITS, " + + public static final String GET_METRIC_SQL = "SELECT %s UUID, SERVER_TIME, START_TIME, " + "METRIC_SUM, " + "METRIC_MAX, " + "METRIC_MIN, " + @@ -257,31 +241,24 @@ public class PhoenixTransactSQL { * Different queries for a number and a single hosts are used due to bug * in Apache Phoenix */ - public static final String GET_LATEST_METRIC_SQL = "SELECT %s " + - "E.METRIC_NAME AS METRIC_NAME, E.HOSTNAME AS HOSTNAME, " + - "E.APP_ID AS APP_ID, E.INSTANCE_ID AS INSTANCE_ID, " + + public static final String GET_LATEST_METRIC_SQL = "SELECT %s E.UUID AS UUID, " + "E.SERVER_TIME AS SERVER_TIME, E.START_TIME AS START_TIME, " + - "E.UNITS AS UNITS, E.METRIC_SUM AS METRIC_SUM, " + + "E.METRIC_SUM AS METRIC_SUM, " + "E.METRIC_MAX AS METRIC_MAX, E.METRIC_MIN AS METRIC_MIN, " + "E.METRIC_COUNT AS METRIC_COUNT, E.METRICS AS METRICS " + "FROM %s AS E " + "INNER JOIN " + - "(SELECT METRIC_NAME, HOSTNAME, MAX(SERVER_TIME) AS MAX_SERVER_TIME, " + - "APP_ID, INSTANCE_ID " + + "(SELECT UUID, MAX(SERVER_TIME) AS MAX_SERVER_TIME " + "FROM %s " + "WHERE " + "%s " + - "GROUP BY METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID) " + + "GROUP BY UUID) " + "AS I " + - "ON E.METRIC_NAME=I.METRIC_NAME " + - "AND E.HOSTNAME=I.HOSTNAME " + - "AND E.SERVER_TIME=I.MAX_SERVER_TIME " + - "AND E.APP_ID=I.APP_ID " + - "AND E.INSTANCE_ID=I.INSTANCE_ID"; - - public static final String GET_METRIC_AGGREGATE_ONLY_SQL = "SELECT %s " + - "METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, " + - "UNITS, " + + "ON E.UUID=I.UUID " + + "AND E.SERVER_TIME=I.MAX_SERVER_TIME"; + + public static final String GET_METRIC_AGGREGATE_ONLY_SQL = "SELECT %s UUID, " + + "SERVER_TIME, " + "METRIC_SUM, " + "METRIC_MAX, " + "METRIC_MIN, " + @@ -289,9 +266,8 @@ public class PhoenixTransactSQL { "FROM %s"; public static final String GET_CLUSTER_AGGREGATE_SQL = "SELECT %s " + - "METRIC_NAME, APP_ID, " + - "INSTANCE_ID, SERVER_TIME, " + - "UNITS, " + + "UUID, " + + "SERVER_TIME, " + "METRIC_SUM, " + "HOSTS_COUNT, " + "METRIC_MAX, " + @@ -299,24 +275,23 @@ public class PhoenixTransactSQL { "FROM %s"; public static final String GET_CLUSTER_AGGREGATE_TIME_SQL = "SELECT %s " + - "METRIC_NAME, APP_ID, " + - "INSTANCE_ID, SERVER_TIME, " + - "UNITS, " + + "UUID, " + + "SERVER_TIME, " + "METRIC_SUM, " + "METRIC_COUNT, " + "METRIC_MAX, " + "METRIC_MIN " + "FROM %s"; - public static final String TOP_N_INNER_SQL = "SELECT %s %s " + - "FROM %s WHERE %s GROUP BY %s ORDER BY %s LIMIT %s"; + public static final String TOP_N_INNER_SQL = "SELECT %s UUID " + + "FROM %s WHERE %s GROUP BY UUID ORDER BY %s LIMIT %s"; public static final String GET_METRIC_METADATA_SQL = "SELECT " + - "METRIC_NAME, APP_ID, UNITS, TYPE, START_TIME, " + + "METRIC_NAME, APP_ID, INSTANCE_ID, UUID, UNITS, TYPE, START_TIME, " + "SUPPORTS_AGGREGATION, IS_WHITELISTED FROM METRICS_METADATA"; public static final String GET_HOSTED_APPS_METADATA_SQL = "SELECT " + - "HOSTNAME, APP_IDS FROM HOSTED_APPS_METADATA"; + "HOSTNAME, UUID, APP_IDS FROM HOSTED_APPS_METADATA"; public static final String GET_INSTANCE_HOST_METADATA_SQL = "SELECT " + "INSTANCE_ID, HOSTNAME FROM INSTANCE_HOST_METADATA"; @@ -326,43 +301,40 @@ public class PhoenixTransactSQL { * N - way parallel scan where N = number of regions. */ public static final String GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL = "UPSERT %s " + - "INTO %s (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, UNITS, " + - "METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) " + - "SELECT METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, %s AS SERVER_TIME, UNITS, " + + "INTO %s (UUID, SERVER_TIME, METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) " + + "SELECT UUID, %s AS SERVER_TIME, " + "SUM(METRIC_SUM), SUM(METRIC_COUNT), MAX(METRIC_MAX), MIN(METRIC_MIN) " + "FROM %s WHERE%s SERVER_TIME > %s AND SERVER_TIME <= %s " + - "GROUP BY METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, UNITS"; + "GROUP BY UUID"; /** * Downsample host metrics. */ - public static final String DOWNSAMPLE_HOST_METRIC_SQL_UPSERT_PREFIX = "UPSERT %s INTO %s (METRIC_NAME, HOSTNAME, " + - "APP_ID, INSTANCE_ID, SERVER_TIME, UNITS, METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) "; + public static final String DOWNSAMPLE_HOST_METRIC_SQL_UPSERT_PREFIX = "UPSERT %s INTO %s (UUID, SERVER_TIME, " + + "METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) "; - public static final String TOPN_DOWNSAMPLER_HOST_METRIC_SELECT_SQL = "SELECT METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " + - "%s AS SERVER_TIME, UNITS, %s, 1, %s, %s FROM %s WHERE METRIC_NAME LIKE %s AND SERVER_TIME > %s AND SERVER_TIME <= %s " + - "GROUP BY METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, UNITS ORDER BY %s DESC LIMIT %s"; + public static final String TOPN_DOWNSAMPLER_HOST_METRIC_SELECT_SQL = "SELECT UUID, " + + "%s AS SERVER_TIME, %s, 1, %s, %s FROM %s WHERE UUID IN %s AND SERVER_TIME > %s AND SERVER_TIME <= %s " + + "GROUP BY UUID ORDER BY %s DESC LIMIT %s"; /** * Aggregate app metrics using a GROUP BY clause to take advantage of * N - way parallel scan where N = number of regions. */ public static final String GET_AGGREGATED_APP_METRIC_GROUPBY_SQL = "UPSERT %s " + - "INTO %s (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, UNITS, " + - "METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) SELECT METRIC_NAME, APP_ID, " + - "INSTANCE_ID, %s AS SERVER_TIME, UNITS, ROUND(AVG(METRIC_SUM),2), ROUND(AVG(%s)), " + - "MAX(METRIC_MAX), MIN(METRIC_MIN) FROM %s WHERE%s SERVER_TIME > %s AND " + - "SERVER_TIME <= %s GROUP BY METRIC_NAME, APP_ID, INSTANCE_ID, UNITS"; + "INTO %s (UUID, SERVER_TIME, METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) SELECT UUID, %s AS SERVER_TIME, " + + "ROUND(AVG(METRIC_SUM),2), ROUND(AVG(%s)), MAX(METRIC_MAX), MIN(METRIC_MIN) FROM %s WHERE%s SERVER_TIME > %s AND " + + "SERVER_TIME <= %s GROUP BY UUID"; /** * Downsample cluster metrics. */ - public static final String DOWNSAMPLE_CLUSTER_METRIC_SQL_UPSERT_PREFIX = "UPSERT %s INTO %s (METRIC_NAME, APP_ID, " + - "INSTANCE_ID, SERVER_TIME, UNITS, METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) "; + public static final String DOWNSAMPLE_CLUSTER_METRIC_SQL_UPSERT_PREFIX = "UPSERT %s INTO %s (UUID, SERVER_TIME, " + + "METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) "; - public static final String TOPN_DOWNSAMPLER_CLUSTER_METRIC_SELECT_SQL = "SELECT METRIC_NAME, APP_ID, INSTANCE_ID," + - " %s AS SERVER_TIME, UNITS, %s, 1, %s, %s FROM %s WHERE METRIC_NAME LIKE %s AND SERVER_TIME > %s AND SERVER_TIME <= %s " + - "GROUP BY METRIC_NAME, APP_ID, INSTANCE_ID, UNITS ORDER BY %s DESC LIMIT %s"; + public static final String TOPN_DOWNSAMPLER_CLUSTER_METRIC_SELECT_SQL = "SELECT UUID, " + + "%s AS SERVER_TIME, %s, 1, %s, %s FROM %s WHERE UUID IN %s AND SERVER_TIME > %s AND SERVER_TIME <= %s " + + "GROUP BY UUID ORDER BY %s DESC LIMIT %s"; public static final String METRICS_RECORD_TABLE_NAME = "METRIC_RECORD"; @@ -477,7 +449,7 @@ public class PhoenixTransactSQL { if (orderByClause != null) { sb.append(orderByClause); } else { - sb.append(" ORDER BY METRIC_NAME, SERVER_TIME "); + sb.append(" ORDER BY UUID, SERVER_TIME "); } } @@ -493,30 +465,13 @@ public class PhoenixTransactSQL { try { stmt = connection.prepareStatement(sb.toString()); int pos = 1; - pos = addMetricNames(condition, pos, stmt); + pos = addUuids(condition, pos, stmt); if (condition instanceof TopNCondition) { - TopNCondition topNCondition = (TopNCondition) condition; - if (topNCondition.isTopNHostCondition()) { - pos = addMetricNames(condition, pos, stmt); - } - } - - pos = addHostNames(condition, pos, stmt); - - if (condition instanceof TopNCondition) { - pos = addAppId(condition, pos, stmt); - pos = addInstanceId(condition, pos, stmt); pos = addStartTime(condition, pos, stmt); pos = addEndTime(condition, pos, stmt); - TopNCondition topNCondition = (TopNCondition) condition; - if (topNCondition.isTopNMetricCondition()) { - pos = addHostNames(condition, pos, stmt); - } } - pos = addAppId(condition, pos, stmt); - pos = addInstanceId(condition, pos, stmt); pos = addStartTime(condition, pos, stmt); addEndTime(condition, pos, stmt); @@ -530,6 +485,9 @@ public class PhoenixTransactSQL { throw e; } + if (condition instanceof TopNCondition) { + LOG.info(sb.toString()); + } return stmt; } @@ -627,36 +585,11 @@ public class PhoenixTransactSQL { int pos = 1; //For GET_LATEST_METRIC_SQL_SINGLE_HOST parameters should be set 2 times do { - if (condition.getMetricNames() != null) { - for (String metricName : condition.getMetricNames()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting pos: " + pos + ", value = " + metricName); - } - stmt.setString(pos++, metricName); - } - } - if (condition.getHostnames() != null) { - for (String hostname : condition.getHostnames()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting pos: " + pos + ", value: " + hostname); - } - stmt.setString(pos++, hostname); + if (condition.getUuids() != null) { + for (byte[] uuid : condition.getUuids()) { + stmt.setBytes(pos++, uuid); } } - if (condition.getAppId() != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting pos: " + pos + ", value: " + condition.getAppId()); - } - stmt.setString(pos++, condition.getAppId()); - } - if (condition.getInstanceId() != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting pos: " + pos + - ", value: " + condition.getInstanceId()); - } - stmt.setString(pos++, condition.getInstanceId()); - } - if (condition.getFetchSize() != null) { stmt.setFetchSize(condition.getFetchSize()); pos++; @@ -704,7 +637,7 @@ public class PhoenixTransactSQL { StringBuilder sb = new StringBuilder(queryStmt); sb.append(" WHERE "); sb.append(condition.getConditionClause()); - sb.append(" ORDER BY METRIC_NAME, SERVER_TIME"); + sb.append(" ORDER BY UUID, SERVER_TIME"); if (condition.getLimit() != null) { sb.append(" LIMIT ").append(condition.getLimit()); } @@ -719,20 +652,16 @@ public class PhoenixTransactSQL { stmt = connection.prepareStatement(query); int pos = 1; - pos = addMetricNames(condition, pos, stmt); + pos = addUuids(condition, pos, stmt); if (condition instanceof TopNCondition) { - pos = addAppId(condition, pos, stmt); - pos = addInstanceId(condition, pos, stmt); pos = addStartTime(condition, pos, stmt); pos = addEndTime(condition, pos, stmt); } // TODO: Upper case all strings on POST - pos = addAppId(condition, pos, stmt); - pos = addInstanceId(condition, pos, stmt); pos = addStartTime(condition, pos, stmt); - pos = addEndTime(condition, pos, stmt); + addEndTime(condition, pos, stmt); } catch (SQLException e) { if (stmt != null) { stmt.close(); @@ -740,11 +669,14 @@ public class PhoenixTransactSQL { throw e; } + if (condition instanceof TopNCondition) { + LOG.info(sb.toString()); + } return stmt; } public static PreparedStatement prepareGetLatestAggregateMetricSqlStmt( - Connection connection, Condition condition) throws SQLException { + Connection connection, SplitByMetricNamesCondition condition) throws SQLException { validateConditionIsNotEmpty(condition); @@ -763,7 +695,7 @@ public class PhoenixTransactSQL { if (orderByClause != null) { sb.append(orderByClause); } else { - sb.append(" ORDER BY METRIC_NAME DESC, SERVER_TIME DESC "); + sb.append(" ORDER BY UUID DESC, SERVER_TIME DESC "); } sb.append(" LIMIT ").append(condition.getMetricNames().size()); @@ -779,18 +711,9 @@ public class PhoenixTransactSQL { int pos = 1; if (condition.getMetricNames() != null) { for (; pos <= condition.getMetricNames().size(); pos++) { - stmt.setString(pos, condition.getMetricNames().get(pos - 1)); + stmt.setBytes(pos, condition.getCurrentUuid()); } } - if (condition.getAppId() != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting pos: " + pos + ", value: " + condition.getAppId()); - } - stmt.setString(pos++, condition.getAppId()); - } - if (condition.getInstanceId() != null) { - stmt.setString(pos, condition.getInstanceId()); - } } catch (SQLException e) { if (stmt != null) { @@ -844,50 +767,14 @@ public class PhoenixTransactSQL { return inputTable; } - private static int addMetricNames(Condition condition, int pos, PreparedStatement stmt) throws SQLException { - if (condition.getMetricNames() != null) { - for (int pos2 = 1 ; pos2 <= condition.getMetricNames().size(); pos2++,pos++) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting pos: " + pos + ", value = " + condition.getMetricNames().get(pos2 - 1)); - } - stmt.setString(pos, condition.getMetricNames().get(pos2 - 1)); - } - } - return pos; - } - - private static int addHostNames(Condition condition, int pos, PreparedStatement stmt) throws SQLException { - int i = pos; - if (condition.getHostnames() != null) { - for (String hostname : condition.getHostnames()) { + private static int addUuids(Condition condition, int pos, PreparedStatement stmt) throws SQLException { + if (condition.getUuids() != null) { + for (int pos2 = 1 ; pos2 <= condition.getUuids().size(); pos2++,pos++) { if (LOG.isDebugEnabled()) { - LOG.debug("Setting pos: " + pos + ", value: " + hostname); + LOG.debug("Setting pos: " + pos + ", value = " + condition.getUuids().get(pos2 - 1)); } - stmt.setString(i++, hostname); - } - } - return i; - } - - - private static int addAppId(Condition condition, int pos, PreparedStatement stmt) throws SQLException { - - if (condition.getAppId() != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting pos: " + pos + ", value: " + condition.getAppId()); - } - stmt.setString(pos++, condition.getAppId()); - } - return pos; - } - - private static int addInstanceId(Condition condition, int pos, PreparedStatement stmt) throws SQLException { - - if (condition.getInstanceId() != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting pos: " + pos + ", value: " + condition.getInstanceId()); + stmt.setBytes(pos, condition.getUuids().get(pos2 - 1)); } - stmt.setString(pos++, condition.getInstanceId()); } return pos; } http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/SplitByMetricNamesCondition.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/SplitByMetricNamesCondition.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/SplitByMetricNamesCondition.java index bb4dced..45ea74c 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/SplitByMetricNamesCondition.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/SplitByMetricNamesCondition.java @@ -24,7 +24,7 @@ import java.util.List; // TODO get rid of this class public class SplitByMetricNamesCondition implements Condition { private final Condition adaptee; - private String currentMetric; + private byte[] currentUuid; private boolean metricNamesNotCondition = false; public SplitByMetricNamesCondition(Condition condition){ @@ -37,8 +37,13 @@ public class SplitByMetricNamesCondition implements Condition { } @Override + public List<byte[]> getUuids() { + return adaptee.getUuids(); + } + + @Override public List<String> getMetricNames() { - return Collections.singletonList(currentMetric); + return Collections.singletonList(new String(currentUuid)); } @Override @@ -91,31 +96,12 @@ public class SplitByMetricNamesCondition implements Condition { if (sb.length() > 1) { sb.append(" OR "); } - sb.append("METRIC_NAME = ?"); + sb.append("UUID = ?"); } appendConjunction = true; } - // TODO prevent user from using this method with multiple hostnames and SQL LIMIT clause - if (getHostnames() != null && getHostnames().size() > 1) { - StringBuilder hostnamesCondition = new StringBuilder(); - for (String hostname: getHostnames()) { - if (hostnamesCondition.length() > 0) { - hostnamesCondition.append(" ,"); - } else { - hostnamesCondition.append(" HOSTNAME IN ("); - } - hostnamesCondition.append('?'); - } - hostnamesCondition.append(')'); - appendConjunction = DefaultCondition.append(sb, appendConjunction, getHostnames(), hostnamesCondition.toString()); - } else { - appendConjunction = DefaultCondition.append(sb, appendConjunction, getHostnames(), " HOSTNAME = ?"); - } - appendConjunction = DefaultCondition.append(sb, appendConjunction, - getAppId(), " APP_ID = ?"); - appendConjunction = DefaultCondition.append(sb, appendConjunction, - getInstanceId(), " INSTANCE_ID = ?"); + appendConjunction = DefaultCondition.append(sb, appendConjunction, getStartTime(), " SERVER_TIME >= ?"); DefaultCondition.append(sb, appendConjunction, getEndTime(), @@ -178,8 +164,12 @@ public class SplitByMetricNamesCondition implements Condition { return adaptee.getMetricNames(); } - public void setCurrentMetric(String currentMetric) { - this.currentMetric = currentMetric; + public void setCurrentUuid(byte[] uuid) { + this.currentUuid = uuid; + } + + public byte[] getCurrentUuid() { + return currentUuid; } @Override http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/TopNCondition.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/TopNCondition.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/TopNCondition.java index 0f2a02c..93242bd 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/TopNCondition.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/TopNCondition.java @@ -32,11 +32,11 @@ public class TopNCondition extends DefaultCondition{ private Function topNFunction; private static final Log LOG = LogFactory.getLog(TopNCondition.class); - public TopNCondition(List<String> metricNames, List<String> hostnames, String appId, + public TopNCondition(List<byte[]> uuids, List<String> metricNames, List<String> hostnames, String appId, String instanceId, Long startTime, Long endTime, Precision precision, Integer limit, boolean grouped, Integer topN, Function topNFunction, boolean isBottomN) { - super(metricNames, hostnames, appId, instanceId, startTime, endTime, precision, limit, grouped); + super(uuids, metricNames, hostnames, appId, instanceId, startTime, endTime, precision, limit, grouped); this.topN = topN; this.isBottomN = isBottomN; this.topNFunction = topNFunction; @@ -44,34 +44,20 @@ public class TopNCondition extends DefaultCondition{ @Override public StringBuilder getConditionClause() { - StringBuilder sb = new StringBuilder(); - boolean appendConjunction = false; - - if (isTopNHostCondition(metricNames, hostnames)) { - appendConjunction = appendMetricNameClause(sb); - - StringBuilder hostnamesCondition = new StringBuilder(); - hostnamesCondition.append(" HOSTNAME IN ("); - hostnamesCondition.append(getTopNInnerQuery()); - hostnamesCondition.append(")"); - appendConjunction = append(sb, appendConjunction, getHostnames(), hostnamesCondition.toString()); - - } else if (isTopNMetricCondition(metricNames, hostnames)) { - - StringBuilder metricNamesCondition = new StringBuilder(); - metricNamesCondition.append(" METRIC_NAME IN ("); - metricNamesCondition.append(getTopNInnerQuery()); - metricNamesCondition.append(")"); - appendConjunction = append(sb, appendConjunction, getMetricNames(), metricNamesCondition.toString()); - appendConjunction = appendHostnameClause(sb, appendConjunction); - } else { + + + if (!(isTopNHostCondition(metricNames, hostnames) || isTopNMetricCondition(metricNames, hostnames))) { LOG.error("Unsupported TopN Operation requested. Query can have either multiple hosts or multiple metric names " + "but not both."); return null; } - appendConjunction = append(sb, appendConjunction, getAppId(), " APP_ID = ?"); - appendConjunction = append(sb, appendConjunction, getInstanceId(), " INSTANCE_ID = ?"); + StringBuilder sb = new StringBuilder(); + sb.append(" UUID IN ("); + sb.append(getTopNInnerQuery()); + sb.append(")"); + + boolean appendConjunction = true; appendConjunction = append(sb, appendConjunction, getStartTime(), " SERVER_TIME >= ?"); append(sb, appendConjunction, getEndTime(), " SERVER_TIME < ?"); @@ -79,29 +65,10 @@ public class TopNCondition extends DefaultCondition{ } public String getTopNInnerQuery() { - String innerQuery = null; - - if (isTopNHostCondition(metricNames, hostnames)) { - String groupByClause = "METRIC_NAME, HOSTNAME, APP_ID"; - String orderByClause = getTopNOrderByClause(); - - innerQuery = String.format(PhoenixTransactSQL.TOP_N_INNER_SQL, PhoenixTransactSQL.getNaiveTimeRangeHint(getStartTime(), NATIVE_TIME_RANGE_DELTA), - "HOSTNAME", PhoenixTransactSQL.getTargetTableUsingPrecision(precision, true), super.getConditionClause().toString(), - groupByClause, orderByClause, topN); - - - } else if (isTopNMetricCondition(metricNames, hostnames)) { - - String groupByClause = "METRIC_NAME, APP_ID"; - String orderByClause = getTopNOrderByClause(); - - innerQuery = String.format(PhoenixTransactSQL.TOP_N_INNER_SQL, PhoenixTransactSQL.getNaiveTimeRangeHint(getStartTime(), NATIVE_TIME_RANGE_DELTA), - "METRIC_NAME", PhoenixTransactSQL.getTargetTableUsingPrecision(precision, (hostnames != null && hostnames.size() == 1)), - super.getConditionClause().toString(), - groupByClause, orderByClause, topN); - } - - return innerQuery; + return String.format(PhoenixTransactSQL.TOP_N_INNER_SQL, + PhoenixTransactSQL.getNaiveTimeRangeHint(getStartTime(), NATIVE_TIME_RANGE_DELTA), + PhoenixTransactSQL.getTargetTableUsingPrecision(precision, CollectionUtils.isNotEmpty(hostnames)), + super.getConditionClause().toString(), getTopNOrderByClause(), topN); } private String getTopNOrderByClause() { http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/uuid/HashBasedUuidGenStrategy.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/uuid/HashBasedUuidGenStrategy.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/uuid/HashBasedUuidGenStrategy.java new file mode 100644 index 0000000..f35c23a --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/uuid/HashBasedUuidGenStrategy.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.uuid; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class HashBasedUuidGenStrategy implements MetricUuidGenStrategy { + + /** + * Computes the UUID for a timelineClusterMetric. + * @param timelineClusterMetric + * @param maxLength + * @return byte array of length 'maxlength' + */ + @Override + public byte[] computeUuid(TimelineClusterMetric timelineClusterMetric, int maxLength) { + + int metricNameUuidLength = 12; + String metricName = timelineClusterMetric.getMetricName(); + + //Compute the individual splits. + String[] splits = getIndidivualSplits(metricName); + + /* + Compute the ascii sum of every split in the metric name. (asciiSum += (int) splits[s].charAt(i)) + For the last split, use weighted sum instead of ascii sum. (asciiSum += ((i+1) * (int) splits[s].charAt(i))) + These weighted sums are 'appended' to get the unique ID for metric name. + */ + StringBuilder splitSums = new StringBuilder(); + if (splits.length > 0) { + for (int s = 0; s < splits.length; s++) { + int asciiSum = 0; + if ( s < splits.length -1) { + for (int i = 0; i < splits[s].length(); i++) { + asciiSum += (int) splits[s].charAt(i); // Get Ascii Sum. + } + } else { + for (int i = 0; i < splits[s].length(); i++) { + asciiSum += ((i+1) * (int) splits[s].charAt(i)); //weighted sum for last split. + } + } + splitSums.append(asciiSum); //Append the sum to the array of sums. + } + } + + //Compute a unique metric seed for the stemmed metric name + String stemmedMetric = stem(metricName); + long metricSeed = 100123456789L; + for (int i = 0; i < stemmedMetric.length(); i++) { + metricSeed += stemmedMetric.charAt(i); + } + + //Reverse the computed seed to get a metric UUID portion which is used optionally. + byte[] metricUuidPortion = StringUtils.reverse(String.valueOf(metricSeed)).getBytes(); + String splitSumString = splitSums.toString(); + int splitLength = splitSumString.length(); + + //If splitSums length > required metric UUID length, use only the required length suffix substring of the splitSums as metric UUID. + if (splitLength > metricNameUuidLength) { + metricUuidPortion = ArrayUtils.subarray(splitSumString.getBytes(), splitLength - metricNameUuidLength, splitLength); + } else { + //If splitSums is not enough for required metric UUID length, pad with the metric uuid portion. + int pad = metricNameUuidLength - splitLength; + metricUuidPortion = ArrayUtils.addAll(splitSumString.getBytes(), ArrayUtils.subarray(metricUuidPortion, 0, pad)); + } + + /* + For appId and instanceId the logic is similar. Use a seed integer to start with and compute ascii sum. + Based on required length, use a suffix of the computed uuid. + */ + String appId = timelineClusterMetric.getAppId(); + int appidSeed = 11; + for (int i = 0; i < appId.length(); i++) { + appidSeed += appId.charAt(i); + } + String appIdSeedStr = String.valueOf(appidSeed); + byte[] appUuidPortion = ArrayUtils.subarray(appIdSeedStr.getBytes(), appIdSeedStr.length() - 2, appIdSeedStr.length()); + + String instanceId = timelineClusterMetric.getInstanceId(); + ByteBuffer buffer = ByteBuffer.allocate(4); + byte[] instanceUuidPortion = new byte[2]; + if (StringUtils.isNotEmpty(instanceId)) { + int instanceIdSeed = 1489; + for (int i = 0; i < appId.length(); i++) { + instanceIdSeed += appId.charAt(i); + } + buffer.putInt(instanceIdSeed); + ArrayUtils.subarray(buffer.array(), 2, 4); + } + + // Concatenate all UUIDs together (metric uuid + appId uuid + instanceId uuid) + return ArrayUtils.addAll(ArrayUtils.addAll(metricUuidPortion, appUuidPortion), instanceUuidPortion); + } + + /** + * Splits the metric name into individual tokens. + * For example, + * kafka.server.ReplicaManager.LeaderCount -> [kafka, server, ReplicaManager, LeaderCount] + * default.General.api_drop_table_15min_rate -> [default, General, api, drop, table, 15min, rate] + * @param metricName + * @return + */ + private String[] getIndidivualSplits(String metricName) { + List<String> tokens = new ArrayList<>(); + String[] splits = new String[0]; + if (metricName.contains("\\.")) { + splits = metricName.split("\\."); + for (String split : splits) { + if (split.contains("_")) { + tokens.addAll(Arrays.asList(split.split("_"))); + } else { + tokens.add(split); + } + } + } + + if (splits.length <= 1) { + splits = metricName.split("\\_"); + return splits; + } + + if (splits.length <= 1) { + splits = metricName.split("\\="); + return splits; + } + + return tokens.toArray(new String[tokens.size()]); + } + + /** + * Stem the metric name. Remove a set of usual suspects characters. + * @param metricName + * @return + */ + private String stem(String metricName) { + String metric = metricName.toLowerCase(); + String regex = "[\\.\\_\\%\\-\\=]"; + String trimmedMetric = StringUtils.removePattern(metric, regex); + return trimmedMetric; + } + + + /** + * Computes the UUID of a string. (hostname) + * Uses the ascii sum of the String. Numbers in the String are treated as actual numerical values rather than ascii values. + * @param value + * @param maxLength + * @return byte array of length 'maxlength' + */ + @Override + public byte[] computeUuid(String value, int maxLength) { + + if (StringUtils.isEmpty(value)) { + return null; + } + int len = value.length(); + int numericValue = 0; + int seed = 1489; + for (int i = 0; i < len; i++) { + int ascii = value.charAt(i); + if (48 <= ascii && ascii <= 57) { + numericValue += numericValue * 10 + (ascii - 48); + } else { + if (numericValue > 0) { + seed += numericValue; + numericValue = 0; + } + seed+= value.charAt(i); + } + } + + String seedStr = String.valueOf(seed); + if (seedStr.length() < maxLength) { + return null; + } else { + return seedStr.substring(seedStr.length() - maxLength, seedStr.length()).getBytes(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/uuid/MetricUuidGenStrategy.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/uuid/MetricUuidGenStrategy.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/uuid/MetricUuidGenStrategy.java new file mode 100644 index 0000000..9aab96a --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/uuid/MetricUuidGenStrategy.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.uuid; + +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric; + +public interface MetricUuidGenStrategy { + + /** + * Compute UUID for a given value + * @param timelineMetric instance + * @param maxLength + * @return + */ +// byte[] computeUuid(TimelineMetric timelineMetric, int maxLength); + + /** + * Compute UUID for a given value + * @param value + * @param maxLength + * @return + */ + byte[] computeUuid(TimelineClusterMetric timelineClusterMetric, int maxLength); + + /** + * Compute UUID for a given value + * @param value + * @param maxLength + * @return + */ + byte[] computeUuid(String value, int maxLength); + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/uuid/RandomUuidGenStrategy.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/uuid/RandomUuidGenStrategy.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/uuid/RandomUuidGenStrategy.java new file mode 100644 index 0000000..39d9549 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/uuid/RandomUuidGenStrategy.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.uuid; + +import com.google.common.primitives.Longs; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric; + +import java.security.SecureRandom; + +public class RandomUuidGenStrategy implements MetricUuidGenStrategy { + private static SecureRandom randomGenerator; + + public RandomUuidGenStrategy() { + randomGenerator = new SecureRandom( + Longs.toByteArray(System.currentTimeMillis())); + } + + @Override + public byte[] computeUuid(TimelineClusterMetric timelineClusterMetric, int maxLength) { + final byte[] bytes = new byte[maxLength]; + randomGenerator.nextBytes(bytes); + return bytes; + } + +// @Override +// public byte[] computeUuid(TimelineMetric timelineMetric, int maxLength) { +// return new byte[10]; +// } + + @Override + public byte[] computeUuid(String value, int maxLength) { + final byte[] bytes = new byte[maxLength]; + randomGenerator.nextBytes(bytes); + return bytes; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java index 50cfb08..472a787 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java @@ -38,6 +38,7 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.metrics2.sink.timeline.Precision; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricStore; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey; import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.EntityIdentifier; import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper; import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.NameValuePair; @@ -461,6 +462,22 @@ public class TimelineWebServices { } } + @GET + @Path("/metrics/uuids") + @Produces({ MediaType.APPLICATION_JSON }) + public Map<String, TimelineMetricMetadataKey> getUuids( + @Context HttpServletRequest req, + @Context HttpServletResponse res + ) { + init(res); + + try { + return timelineMetricStore.getUuids(); + } catch (Exception e) { + throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); + } + } + /** * This is a discovery endpoint that advertises known live collector * instances. Note: It will always answer with current instance as live. http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/resources/metrics_def/AMBARI_SERVER.dat ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/resources/metrics_def/AMBARI_SERVER.dat b/ambari-metrics/ambari-metrics-timelineservice/src/main/resources/metrics_def/AMBARI_SERVER.dat new file mode 100644 index 0000000..407b0f8 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/resources/metrics_def/AMBARI_SERVER.dat @@ -0,0 +1,40 @@ +jvm.buffers.direct.capacity +jvm.buffers.direct.count +jvm.buffers.direct.used +jvm.buffers.mapped.capacity +jvm.buffers.mapped.count +jvm.buffers.mapped.used +jvm.file.open.descriptor.ratio +jvm.gc.ConcurrentMarkSweep.count +jvm.gc.ConcurrentMarkSweep.time +jvm.gc.ParNew.count +jvm.gc.ParNew.time +jvm.memory.heap.committed +jvm.memory.heap.init +jvm.memory.heap.max +jvm.memory.heap.usage +jvm.memory.heap.used +jvm.memory.non-heap.committed +jvm.memory.non-heap.init +jvm.memory.non-heap.max +jvm.memory.non-heap.usage +jvm.memory.non-heap.used +jvm.memory.pools.CMS-Old-Gen.usage +jvm.memory.pools.Code-Cache.usage +jvm.memory.pools.Compressed-Class-Space.usage +jvm.memory.pools.Metaspace.usage +jvm.memory.pools.Par-Eden-Space.usage +jvm.memory.pools.Par-Survivor-Space.usage +jvm.memory.total.committed +jvm.memory.total.init +jvm.memory.total.max +jvm.memory.total.used +jvm.threads.blocked.count +jvm.threads.count +jvm.threads.daemon.count +jvm.threads.deadlock.count +jvm.threads.new.count +jvm.threads.runnable.count +jvm.threads.terminated.count +jvm.threads.timed_waiting.count +jvm.threads.waiting.count \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/resources/metrics_def/JOBHISTORYSERVER.dat ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/resources/metrics_def/JOBHISTORYSERVER.dat b/ambari-metrics/ambari-metrics-timelineservice/src/main/resources/metrics_def/JOBHISTORYSERVER.dat new file mode 100644 index 0000000..f4eccce --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/resources/metrics_def/JOBHISTORYSERVER.dat @@ -0,0 +1,58 @@ +jvm.JvmMetrics.GcCount +jvm.JvmMetrics.GcCountCopy +jvm.JvmMetrics.GcCountMarkSweepCompact +jvm.JvmMetrics.GcTimeMillis +jvm.JvmMetrics.GcTimeMillisCopy +jvm.JvmMetrics.GcTimeMillisMarkSweepCompact +jvm.JvmMetrics.LogError +jvm.JvmMetrics.LogFatal +jvm.JvmMetrics.LogInfo +jvm.JvmMetrics.LogWarn +jvm.JvmMetrics.MemHeapCommittedM +jvm.JvmMetrics.MemHeapMaxM +jvm.JvmMetrics.MemHeapUsedM +jvm.JvmMetrics.MemMaxM +jvm.JvmMetrics.MemNonHeapCommittedM +jvm.JvmMetrics.MemNonHeapMaxM +jvm.JvmMetrics.MemNonHeapUsedM +jvm.JvmMetrics.ThreadsBlocked +jvm.JvmMetrics.ThreadsNew +jvm.JvmMetrics.ThreadsRunnable +jvm.JvmMetrics.ThreadsTerminated +jvm.JvmMetrics.ThreadsTimedWaiting +jvm.JvmMetrics.ThreadsWaiting +metricssystem.MetricsSystem.DroppedPubAll +metricssystem.MetricsSystem.NumActiveSinks +metricssystem.MetricsSystem.NumActiveSources +metricssystem.MetricsSystem.NumAllSinks +metricssystem.MetricsSystem.NumAllSources +metricssystem.MetricsSystem.PublishAvgTime +metricssystem.MetricsSystem.PublishNumOps +metricssystem.MetricsSystem.Sink_timelineAvgTime +metricssystem.MetricsSystem.Sink_timelineDropped +metricssystem.MetricsSystem.Sink_timelineNumOps +metricssystem.MetricsSystem.Sink_timelineQsize +metricssystem.MetricsSystem.SnapshotAvgTime +metricssystem.MetricsSystem.SnapshotNumOps +rpc.rpc.CallQueueLength +rpc.rpc.NumOpenConnections +rpc.rpc.ReceivedBytes +rpc.rpc.RpcAuthenticationFailures +rpc.rpc.RpcAuthenticationSuccesses +rpc.rpc.RpcAuthorizationFailures +rpc.rpc.RpcAuthorizationSuccesses +rpc.rpc.RpcClientBackoff +rpc.rpc.RpcProcessingTimeAvgTime +rpc.rpc.RpcProcessingTimeNumOps +rpc.rpc.RpcQueueTimeAvgTime +rpc.rpc.RpcQueueTimeNumOps +rpc.rpc.RpcSlowCalls +rpc.rpc.SentBytes +ugi.UgiMetrics.GetGroupsAvgTime +ugi.UgiMetrics.GetGroupsNumOps +ugi.UgiMetrics.LoginFailureAvgTime +ugi.UgiMetrics.LoginFailureNumOps +ugi.UgiMetrics.LoginSuccessAvgTime +ugi.UgiMetrics.LoginSuccessNumOps +ugi.UgiMetrics.RenewalFailures +ugi.UgiMetrics.RenewalFailuresTotal \ No newline at end of file
