http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataManager.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataManager.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataManager.java new file mode 100644 index 0000000..97eb7b1 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataManager.java @@ -0,0 +1,606 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.discovery; + +import java.net.MalformedURLException; +import java.net.URISyntaxException; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration; +import org.apache.ambari.metrics.core.timeline.uuid.MetricUuidGenStrategy; +import org.apache.ambari.metrics.core.timeline.uuid.RandomUuidGenStrategy; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.MetadataException; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; +import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor; +import org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric; +import org.apache.ambari.metrics.core.timeline.uuid.HashBasedUuidGenStrategy; + +public class TimelineMetricMetadataManager { + private static final Log LOG = LogFactory.getLog(TimelineMetricMetadataManager.class); + private boolean isDisabled = false; + // Cache all metadata on retrieval + private final Map<TimelineMetricMetadataKey, TimelineMetricMetadata> METADATA_CACHE = new ConcurrentHashMap<>(); + private final Map<String, TimelineMetricMetadataKey> uuidKeyMap = new ConcurrentHashMap<>(); + // Map to lookup apps on a host + private final Map<String, TimelineMetricHostMetadata> HOSTED_APPS_MAP = new ConcurrentHashMap<>(); + private final Map<String, String> uuidHostMap = new ConcurrentHashMap<>(); + private final Map<String, Set<String>> INSTANCE_HOST_MAP = new ConcurrentHashMap<>(); + // Sync only when needed + AtomicBoolean SYNC_HOSTED_APPS_METADATA = new AtomicBoolean(false); + AtomicBoolean SYNC_HOSTED_INSTANCES_METADATA = new AtomicBoolean(false); + private MetricUuidGenStrategy uuidGenStrategy = new HashBasedUuidGenStrategy(); + public static final int TIMELINE_METRIC_UUID_LENGTH = 16; + public static final int HOSTNAME_UUID_LENGTH = 4; + + // Single thread to sync back new writes to the store + private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + + private PhoenixHBaseAccessor hBaseAccessor; + private Configuration metricsConf; + + TimelineMetricMetadataSync metricMetadataSync; + // Filter metrics names matching given patterns, from metadata + final List<String> metricNameFilters = new ArrayList<>(); + + // Test friendly construction since mock instrumentation is difficult to get + // working with hadoop mini cluster + public TimelineMetricMetadataManager(Configuration metricsConf, PhoenixHBaseAccessor hBaseAccessor) { + this.metricsConf = metricsConf; + this.hBaseAccessor = hBaseAccessor; + String patternStrings = metricsConf.get(TimelineMetricConfiguration.TIMELINE_METRIC_METADATA_FILTERS); + if (!StringUtils.isEmpty(patternStrings)) { + metricNameFilters.addAll(Arrays.asList(patternStrings.split(","))); + } + + uuidGenStrategy = getUuidStrategy(metricsConf); + } + + public TimelineMetricMetadataManager(PhoenixHBaseAccessor hBaseAccessor) throws MalformedURLException, URISyntaxException { + this(TimelineMetricConfiguration.getInstance().getMetricsConf(), hBaseAccessor); + } + + /** + * Initialize Metadata from the store + */ + public void initializeMetadata() { + if (metricsConf.getBoolean(TimelineMetricConfiguration.DISABLE_METRIC_METADATA_MGMT, false)) { + isDisabled = true; + } else { + metricMetadataSync = new TimelineMetricMetadataSync(this); + // Schedule the executor to sync to store + executorService.scheduleWithFixedDelay(metricMetadataSync, + metricsConf.getInt(TimelineMetricConfiguration.METRICS_METADATA_SYNC_INIT_DELAY, 120), // 2 minutes + metricsConf.getInt(TimelineMetricConfiguration.METRICS_METADATA_SYNC_SCHEDULE_DELAY, 300), // 5 minutes + TimeUnit.SECONDS); + // Read from store and initialize map + try { + Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadata = getMetadataFromStore(); + + LOG.info("Retrieved " + metadata.size() + ", metadata objects from store."); + // Store in the cache + METADATA_CACHE.putAll(metadata); + + Map<String, TimelineMetricHostMetadata> hostedAppData = getHostedAppsFromStore(); + + LOG.info("Retrieved " + hostedAppData.size() + " host objects from store."); + HOSTED_APPS_MAP.putAll(hostedAppData); + + loadUuidMapsOnInit(); + + hBaseAccessor.setMetadataInstance(this); + } catch (SQLException e) { + LOG.warn("Exception loading metric metadata", e); + } + } + } + + public Map<TimelineMetricMetadataKey, TimelineMetricMetadata> getMetadataCache() { + return METADATA_CACHE; + } + + public TimelineMetricMetadata getMetadataCacheValue(TimelineMetricMetadataKey key) { + return METADATA_CACHE.get(key); + } + + public Map<String, TimelineMetricHostMetadata> getHostedAppsCache() { + return HOSTED_APPS_MAP; + } + + public Map<String, Set<String>> getHostedInstanceCache() { + return INSTANCE_HOST_MAP; + } + + public boolean syncHostedAppsMetadata() { + return SYNC_HOSTED_APPS_METADATA.get(); + } + + public boolean syncHostedInstanceMetadata() { + return SYNC_HOSTED_INSTANCES_METADATA.get(); + } + + public void markSuccessOnSyncHostedAppsMetadata() { + SYNC_HOSTED_APPS_METADATA.set(false); + } + + public void markSuccessOnSyncHostedInstanceMetadata() { + SYNC_HOSTED_INSTANCES_METADATA.set(false); + } + /** + * Test metric name for valid patterns and return true/false + */ + boolean skipMetadataCache(String metricName) { + for (String pattern : metricNameFilters) { + if (metricName.contains(pattern)) { + return true; + } + } + return false; + } + + /** + * Update value in metadata cache + * @param metadata @TimelineMetricMetadata + */ + public void putIfModifiedTimelineMetricMetadata(TimelineMetricMetadata metadata) { + if (skipMetadataCache(metadata.getMetricName())) { + return; + } + + TimelineMetricMetadataKey key = new TimelineMetricMetadataKey( + metadata.getMetricName(), metadata.getAppId(), metadata.getInstanceId()); + + TimelineMetricMetadata metadataFromCache = METADATA_CACHE.get(key); + + if (metadataFromCache != null) { + try { + if (metadataFromCache.needsToBeSynced(metadata)) { + metadata.setIsPersisted(false); // Set the flag to ensure sync to store on next run + METADATA_CACHE.put(key, metadata); + } + } catch (MetadataException e) { + LOG.warn("Error inserting Metadata in cache.", e); + } + + } else { + METADATA_CACHE.put(key, metadata); + } + } + + /** + * Update value in hosted apps cache + * @param hostname Host name + * @param appId Application Id + */ + public void putIfModifiedHostedAppsMetadata(String hostname, String appId) { + TimelineMetricHostMetadata timelineMetricHostMetadata = HOSTED_APPS_MAP.get(hostname); + ConcurrentHashMap<String, String> apps = (timelineMetricHostMetadata != null) ? timelineMetricHostMetadata.getHostedApps() : null; + if (apps == null) { + apps = new ConcurrentHashMap<>(); + if (timelineMetricHostMetadata == null) { + HOSTED_APPS_MAP.put(hostname, new TimelineMetricHostMetadata(apps)); + } else { + HOSTED_APPS_MAP.get(hostname).setHostedApps(apps); + } + } + + if (!apps.containsKey(appId)) { + apps.put(appId, appId); + SYNC_HOSTED_APPS_METADATA.set(true); + } + } + + public void putIfModifiedHostedInstanceMetadata(String instanceId, String hostname) { + if (StringUtils.isEmpty(instanceId)) { + return; + } + + Set<String> hosts = INSTANCE_HOST_MAP.get(instanceId); + if (hosts == null) { + hosts = new HashSet<>(); + INSTANCE_HOST_MAP.put(instanceId, hosts); + } + + if (!hosts.contains(hostname)) { + hosts.add(hostname); + SYNC_HOSTED_INSTANCES_METADATA.set(true); + } + } + + public void persistMetadata(Collection<TimelineMetricMetadata> metadata) throws SQLException { + hBaseAccessor.saveMetricMetadata(metadata); + } + + public void persistHostedAppsMetadata(Map<String, TimelineMetricHostMetadata> hostedApps) throws SQLException { + hBaseAccessor.saveHostAppsMetadata(hostedApps); + } + + public void persistHostedInstanceMetadata(Map<String, Set<String>> hostedInstancesMetadata) throws SQLException { + hBaseAccessor.saveInstanceHostsMetadata(hostedInstancesMetadata); + } + + public TimelineMetricMetadata getTimelineMetricMetadata(TimelineMetric timelineMetric, boolean isWhitelisted) { + return new TimelineMetricMetadata( + timelineMetric.getMetricName(), + timelineMetric.getAppId(), + timelineMetric.getInstanceId(), + timelineMetric.getUnits(), + timelineMetric.getType(), + timelineMetric.getStartTime(), + supportAggregates(timelineMetric), + isWhitelisted + ); + } + + public boolean isDisabled() { + return isDisabled; + } + + boolean isDistributedModeEnabled() { + return metricsConf.get("timeline.metrics.service.operation.mode").equals("distributed"); + } + + /** + * Fetch metrics metadata from store + * @throws SQLException + */ + Map<TimelineMetricMetadataKey, TimelineMetricMetadata> getMetadataFromStore() throws SQLException { + return hBaseAccessor.getTimelineMetricMetadata(); + } + + /** + * Fetch hosted apps from store + * @throws SQLException + */ + Map<String, TimelineMetricHostMetadata> getHostedAppsFromStore() throws SQLException { + return hBaseAccessor.getHostedAppsMetadata(); + } + + Map<String, Set<String>> getHostedInstancesFromStore() throws SQLException { + return hBaseAccessor.getInstanceHostsMetdata(); + } + + private boolean supportAggregates(TimelineMetric metric) { + return MapUtils.isEmpty(metric.getMetadata()) || + !(String.valueOf(true).equals(metric.getMetadata().get("skipAggregation"))); + } + + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // UUID Management + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + + /** + * Load the UUID mappings from the UUID table on startup. + */ + private void loadUuidMapsOnInit() { + + for (TimelineMetricMetadataKey key : METADATA_CACHE.keySet()) { + TimelineMetricMetadata timelineMetricMetadata = METADATA_CACHE.get(key); + if (timelineMetricMetadata != null && timelineMetricMetadata.getUuid() != null) { + uuidKeyMap.put(new String(timelineMetricMetadata.getUuid()), key); + } + } + + for (String host : HOSTED_APPS_MAP.keySet()) { + TimelineMetricHostMetadata timelineMetricHostMetadata = HOSTED_APPS_MAP.get(host); + if (timelineMetricHostMetadata != null && timelineMetricHostMetadata.getUuid() != null) { + uuidHostMap.put(new String(timelineMetricHostMetadata.getUuid()), host); + } + } + } + + /** + * Returns the UUID gen strategy. + * @param configuration + * @return + */ + private MetricUuidGenStrategy getUuidStrategy(Configuration configuration) { + String strategy = configuration.get(TimelineMetricConfiguration.TIMELINE_METRICS_UUID_GEN_STRATEGY, ""); + if ("random".equalsIgnoreCase(strategy)) { + return new RandomUuidGenStrategy(); + } else { + return new HashBasedUuidGenStrategy(); + } + } + + /** + * Given the hostname, generates a byte array of length 'HOSTNAME_UUID_LENGTH' + * @param hostname + * @return uuid byte array of length 'HOSTNAME_UUID_LENGTH' + */ + private byte[] getUuidForHostname(String hostname) { + + TimelineMetricHostMetadata timelineMetricHostMetadata = HOSTED_APPS_MAP.get(hostname); + if (timelineMetricHostMetadata != null) { + byte[] uuid = timelineMetricHostMetadata.getUuid(); + if (uuid != null) { + return uuid; + } + } + + byte[] uuid = uuidGenStrategy.computeUuid(hostname, HOSTNAME_UUID_LENGTH); + + String uuidStr = new String(uuid); + if (uuidHostMap.containsKey(uuidStr)) { + //TODO fix the collisions + LOG.error("Duplicate key computed for " + hostname +", Collides with " + uuidHostMap.get(uuidStr)); + return uuid; + } + + if (timelineMetricHostMetadata == null) { + timelineMetricHostMetadata = new TimelineMetricHostMetadata(); + HOSTED_APPS_MAP.put(hostname, timelineMetricHostMetadata); + } + timelineMetricHostMetadata.setUuid(uuid); + uuidHostMap.put(uuidStr, hostname); + + return uuid; + } + + /** + * Given a timelineClusterMetric instance, generates a UUID for Metric-App-Instance combination. + * @param timelineClusterMetric + * @return uuid byte array of length 'TIMELINE_METRIC_UUID_LENGTH' + */ + public byte[] getUuid(TimelineClusterMetric timelineClusterMetric) { + TimelineMetricMetadataKey key = new TimelineMetricMetadataKey(timelineClusterMetric.getMetricName(), + timelineClusterMetric.getAppId(), timelineClusterMetric.getInstanceId()); + + TimelineMetricMetadata timelineMetricMetadata = METADATA_CACHE.get(key); + if (timelineMetricMetadata != null) { + byte[] uuid = timelineMetricMetadata.getUuid(); + if (uuid != null) { + return uuid; + } + } + + byte[] uuid = uuidGenStrategy.computeUuid(timelineClusterMetric, TIMELINE_METRIC_UUID_LENGTH); + + String uuidStr = new String(uuid); + if (uuidKeyMap.containsKey(uuidStr) && !uuidKeyMap.get(uuidStr).equals(key)) { + TimelineMetricMetadataKey collidingKey = (TimelineMetricMetadataKey)uuidKeyMap.get(uuidStr); + //TODO fix the collisions + /** + * 2017-08-23 14:12:35,922 ERROR TimelineMetricMetadataManager: + * Duplicate key [52, 50, 51, 53, 50, 53, 53, 53, 49, 54, 57, 50, 50, 54, 0, 0]([B@278a93f9) computed for + * TimelineClusterMetric{metricName='sdisk_dm-11_write_count', appId='hbase', instanceId='', timestamp=1503497400000}, Collides with + * TimelineMetricMetadataKey{metricName='sdisk_dm-20_write_count', appId='hbase', instanceId=''} + */ + LOG.error("Duplicate key " + Arrays.toString(uuid) + "(" + uuid + ") computed for " + timelineClusterMetric.toString() + ", Collides with " + collidingKey.toString()); + return uuid; + } + + if (timelineMetricMetadata == null) { + timelineMetricMetadata = new TimelineMetricMetadata(); + timelineMetricMetadata.setMetricName(timelineClusterMetric.getMetricName()); + timelineMetricMetadata.setAppId(timelineClusterMetric.getAppId()); + timelineMetricMetadata.setInstanceId(timelineClusterMetric.getInstanceId()); + METADATA_CACHE.put(key, timelineMetricMetadata); + } + + timelineMetricMetadata.setUuid(uuid); + timelineMetricMetadata.setIsPersisted(false); + uuidKeyMap.put(uuidStr, key); + return uuid; + } + + /** + * Given a timelineMetric instance, generates a UUID for Metric-App-Instance combination. + * @param timelineMetric + * @return uuid byte array of length 'TIMELINE_METRIC_UUID_LENGTH' + 'HOSTNAME_UUID_LENGTH' + */ + public byte[] getUuid(TimelineMetric timelineMetric) { + + byte[] metricUuid = getUuid(new TimelineClusterMetric(timelineMetric.getMetricName(), timelineMetric.getAppId(), + timelineMetric.getInstanceId(), -1l)); + byte[] hostUuid = getUuidForHostname(timelineMetric.getHostName()); + + return ArrayUtils.addAll(metricUuid, hostUuid); + } + + public byte[] getUuid(String metricName, String appId, String instanceId, String hostname) { + + byte[] metricUuid = getUuid(new TimelineClusterMetric(metricName, appId, instanceId, -1l)); + if (StringUtils.isNotEmpty(hostname)) { + byte[] hostUuid = getUuidForHostname(hostname); + return ArrayUtils.addAll(metricUuid, hostUuid); + } + return metricUuid; + } + + public String getMetricNameFromUuid(byte[] uuid) { + + byte[] metricUuid = uuid; + if (uuid.length == TIMELINE_METRIC_UUID_LENGTH + HOSTNAME_UUID_LENGTH) { + metricUuid = ArrayUtils.subarray(uuid, 0, TIMELINE_METRIC_UUID_LENGTH); + } + + TimelineMetricMetadataKey key = uuidKeyMap.get(new String(metricUuid)); + return key != null ? key.getMetricName() : null; + } + + public TimelineMetric getMetricFromUuid(byte[] uuid) { + if (uuid == null) { + return null; + } + + if (uuid.length == TIMELINE_METRIC_UUID_LENGTH) { + TimelineMetricMetadataKey key = uuidKeyMap.get(new String(uuid)); + return key != null ? new TimelineMetric(key.metricName, null, key.appId, key.instanceId) : null; + } else { + byte[] metricUuid = ArrayUtils.subarray(uuid, 0, TIMELINE_METRIC_UUID_LENGTH); + TimelineMetricMetadataKey key = uuidKeyMap.get(new String(metricUuid)); + if (key == null) { + LOG.error("TimelineMetricMetadataKey is null for : " + Arrays.toString(uuid)); + return null; + } + TimelineMetric timelineMetric = new TimelineMetric(); + timelineMetric.setMetricName(key.metricName); + timelineMetric.setAppId(key.appId); + timelineMetric.setInstanceId(key.instanceId); + + byte[] hostUuid = ArrayUtils.subarray(uuid, TIMELINE_METRIC_UUID_LENGTH, HOSTNAME_UUID_LENGTH + TIMELINE_METRIC_UUID_LENGTH); + timelineMetric.setHostName(uuidHostMap.get(new String(hostUuid))); + return timelineMetric; + } + } + + /** + * Returns the set of UUIDs for a given GET request. If there are wildcards (%), resolves them based on UUID map. + * @param metricNames + * @param hostnames + * @param appId + * @param instanceId + * @return Set of UUIds + */ + public List<byte[]> getUuids(Collection<String> metricNames, List<String> hostnames, String appId, String instanceId) { + + Collection<String> sanitizedMetricNames = new HashSet<>(); + + for (String metricName : metricNames) { + if (metricName.contains("%")) { + String metricRegEx; + //Special case handling for metric name with * and __%. + //For example, dfs.NNTopUserOpCounts.windowMs=300000.op=*.user=%.count + // or dfs.NNTopUserOpCounts.windowMs=300000.op=__%.user=%.count + if (metricName.contains("*") || metricName.contains("__%")) { + String metricNameWithEscSeq = metricName.replace("*", "\\*").replace("__%", "..%"); + metricRegEx = metricNameWithEscSeq.replace("%", ".*"); + } else { + metricRegEx = metricName.replace("%", ".*"); + } + for (TimelineMetricMetadataKey key : METADATA_CACHE.keySet()) { + String metricNameFromMetadata = key.getMetricName(); + if (metricNameFromMetadata.matches(metricRegEx)) { + sanitizedMetricNames.add(metricNameFromMetadata); + } + } + } else { + sanitizedMetricNames.add(metricName); + } + } + + Set<String> sanitizedHostNames = new HashSet<>(); + if (CollectionUtils.isNotEmpty(hostnames)) { + for (String hostname : hostnames) { + if (hostname.contains("%")) { + String hostRegEx; + hostRegEx = hostname.replace("%", ".*"); + for (String host : HOSTED_APPS_MAP.keySet()) { + if (host.matches(hostRegEx)) { + sanitizedHostNames.add(host); + } + } + } else { + sanitizedHostNames.add(hostname); + } + } + } + + List<byte[]> uuids = new ArrayList<>(); + + if ( StringUtils.isNotEmpty(appId) && !(appId.equals("HOST") || appId.equals("FLUME_HANDLER"))) { //HACK.. Why?? + appId = appId.toLowerCase(); + } + if (CollectionUtils.isNotEmpty(sanitizedHostNames)) { + if (CollectionUtils.isNotEmpty(sanitizedMetricNames)) { + for (String metricName : sanitizedMetricNames) { + TimelineMetric metric = new TimelineMetric(); + metric.setMetricName(metricName); + metric.setAppId(appId); + metric.setInstanceId(instanceId); + for (String hostname : sanitizedHostNames) { + metric.setHostName(hostname); + byte[] uuid = getUuid(metric); + if (uuid != null) { + uuids.add(uuid); + } + } + } + } else { + for (String hostname : sanitizedHostNames) { + byte[] uuid = getUuidForHostname(hostname); + if (uuid != null) { + uuids.add(uuid); + } + } + } + } else { + for (String metricName : sanitizedMetricNames) { + TimelineClusterMetric metric = new TimelineClusterMetric(metricName, appId, instanceId, -1l); + byte[] uuid = getUuid(metric); + if (uuid != null) { + uuids.add(uuid); + } + } + } + + return uuids; + } + + public Map<String, TimelineMetricMetadataKey> getUuidKeyMap() { + return uuidKeyMap; + } + + public List<String> getNotLikeHostnames(List<String> hostnames) { + List<String> result = new ArrayList<>(); + Set<String> sanitizedHostNames = new HashSet<>(); + if (CollectionUtils.isNotEmpty(hostnames)) { + for (String hostname : hostnames) { + if (hostname.contains("%")) { + String hostRegEx; + hostRegEx = hostname.replace("%", ".*"); + for (String host : HOSTED_APPS_MAP.keySet()) { + if (host.matches(hostRegEx)) { + sanitizedHostNames.add(host); + } + } + } else { + sanitizedHostNames.add(hostname); + } + } + } + + for (String hostname: HOSTED_APPS_MAP.keySet()) { + if (!sanitizedHostNames.contains(hostname)) { + result.add(hostname); + } + } + return result; + } +}
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataSync.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataSync.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataSync.java new file mode 100644 index 0000000..0ffff76 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataSync.java @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.discovery; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; + +/** + * Sync metadata info with the store + */ +public class TimelineMetricMetadataSync implements Runnable { + private static final Log LOG = LogFactory.getLog(TimelineMetricMetadataSync.class); + + private final TimelineMetricMetadataManager cacheManager; + + public TimelineMetricMetadataSync(TimelineMetricMetadataManager cacheManager) { + this.cacheManager = cacheManager; + } + + @Override + public void run() { + LOG.debug("Persisting metric metadata..."); + persistMetricMetadata(); + LOG.debug("Persisting hosted apps metadata..."); + persistHostAppsMetadata(); + LOG.debug("Persisting hosted instance metadata..."); + persistHostInstancesMetadata(); + if (cacheManager.isDistributedModeEnabled()) { + LOG.debug("Refreshing metric metadata..."); + refreshMetricMetadata(); + LOG.debug("Refreshing hosted apps metadata..."); + refreshHostAppsMetadata(); + LOG.debug("Refreshing hosted instances metadata..."); + refreshHostedInstancesMetadata(); + } + } + + /** + * Find metrics not persisted to store and persist them + */ + private void persistMetricMetadata() { + List<TimelineMetricMetadata> metadataToPersist = new ArrayList<>(); + // Find all entries to persist + for (TimelineMetricMetadata metadata : cacheManager.getMetadataCache().values()) { + if (!metadata.isPersisted()) { + metadataToPersist.add(metadata); + } + } + boolean markSuccess = false; + if (!metadataToPersist.isEmpty()) { + try { + cacheManager.persistMetadata(metadataToPersist); + markSuccess = true; + } catch (SQLException e) { + LOG.warn("Error persisting metadata.", e); + } + } + // Mark corresponding entries as persisted to skip on next run + if (markSuccess) { + for (TimelineMetricMetadata metadata : metadataToPersist) { + TimelineMetricMetadataKey key = new TimelineMetricMetadataKey( + metadata.getMetricName(), metadata.getAppId(), metadata.getInstanceId() + ); + + // Mark entry as being persisted + metadata.setIsPersisted(true); + // Update cache + cacheManager.getMetadataCache().put(key, metadata); + } + } + } + + /** + * Read all metric metadata and update cached values - HA mode + */ + private void refreshMetricMetadata() { + Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadataFromStore = null; + try { + metadataFromStore = cacheManager.getMetadataFromStore(); + } catch (SQLException e) { + LOG.warn("Error refreshing metadata from store.", e); + } + if (metadataFromStore != null) { + Map<TimelineMetricMetadataKey, TimelineMetricMetadata> cachedMetadata = + cacheManager.getMetadataCache(); + + for (Map.Entry<TimelineMetricMetadataKey, TimelineMetricMetadata> metadataEntry : metadataFromStore.entrySet()) { + if (!cachedMetadata.containsKey(metadataEntry.getKey())) { + cachedMetadata.put(metadataEntry.getKey(), metadataEntry.getValue()); + } + } + } + } + + /** + * Sync hosted apps data if needed + */ + private void persistHostAppsMetadata() { + if (cacheManager.syncHostedAppsMetadata()) { + Map<String, TimelineMetricHostMetadata> persistedData = null; + try { + persistedData = cacheManager.getHostedAppsFromStore(); + } catch (SQLException e) { + LOG.warn("Failed on fetching hosted apps data from store.", e); + return; // Something wrong with store + } + + Map<String, TimelineMetricHostMetadata> cachedData = cacheManager.getHostedAppsCache(); + Map<String, TimelineMetricHostMetadata> dataToSync = new HashMap<>(); + if (cachedData != null && !cachedData.isEmpty()) { + for (Map.Entry<String, TimelineMetricHostMetadata> cacheEntry : cachedData.entrySet()) { + // No persistence / stale data in store + if (persistedData == null || persistedData.isEmpty() || + !persistedData.containsKey(cacheEntry.getKey()) || + !persistedData.get(cacheEntry.getKey()).getHostedApps().keySet().containsAll(cacheEntry.getValue().getHostedApps().keySet())) { + dataToSync.put(cacheEntry.getKey(), cacheEntry.getValue()); + } + } + try { + cacheManager.persistHostedAppsMetadata(dataToSync); + cacheManager.markSuccessOnSyncHostedAppsMetadata(); + + } catch (SQLException e) { + LOG.warn("Error persisting hosted apps metadata.", e); + } + } + + } + } + + /** + * Sync apps instances data if needed + */ + private void persistHostInstancesMetadata() { + if (cacheManager.syncHostedInstanceMetadata()) { + Map<String, Set<String>> persistedData = null; + try { + persistedData = cacheManager.getHostedInstancesFromStore(); + } catch (SQLException e) { + LOG.warn("Failed on fetching hosted instances data from store.", e); + return; // Something wrong with store + } + + Map<String, Set<String>> cachedData = cacheManager.getHostedInstanceCache(); + Map<String, Set<String>> dataToSync = new HashMap<>(); + if (cachedData != null && !cachedData.isEmpty()) { + for (Map.Entry<String, Set<String>> cacheEntry : cachedData.entrySet()) { + // No persistence / stale data in store + if (persistedData == null || persistedData.isEmpty() || + !persistedData.containsKey(cacheEntry.getKey()) || + !persistedData.get(cacheEntry.getKey()).containsAll(cacheEntry.getValue())) { + dataToSync.put(cacheEntry.getKey(), cacheEntry.getValue()); + } + } + try { + cacheManager.persistHostedInstanceMetadata(dataToSync); + cacheManager.markSuccessOnSyncHostedInstanceMetadata(); + + } catch (SQLException e) { + LOG.warn("Error persisting hosted apps metadata.", e); + } + } + + } + } + /** + * Read all hosted apps metadata and update cached values - HA + */ + private void refreshHostAppsMetadata() { + Map<String, TimelineMetricHostMetadata> hostedAppsDataFromStore = null; + try { + hostedAppsDataFromStore = cacheManager.getHostedAppsFromStore(); + } catch (SQLException e) { + LOG.warn("Error refreshing metadata from store.", e); + } + if (hostedAppsDataFromStore != null) { + Map<String, TimelineMetricHostMetadata> cachedData = cacheManager.getHostedAppsCache(); + + for (Map.Entry<String, TimelineMetricHostMetadata> storeEntry : hostedAppsDataFromStore.entrySet()) { + if (!cachedData.containsKey(storeEntry.getKey())) { + cachedData.put(storeEntry.getKey(), storeEntry.getValue()); + } + } + } + } + + private void refreshHostedInstancesMetadata() { + Map<String, Set<String>> hostedInstancesFromStore = null; + try { + hostedInstancesFromStore = cacheManager.getHostedInstancesFromStore(); + } catch (SQLException e) { + LOG.warn("Error refreshing metadata from store.", e); + } + if (hostedInstancesFromStore != null) { + Map<String, Set<String>> cachedData = cacheManager.getHostedInstanceCache(); + + for (Map.Entry<String, Set<String>> storeEntry : hostedInstancesFromStore.entrySet()) { + if (!cachedData.containsKey(storeEntry.getKey())) { + cachedData.put(storeEntry.getKey(), storeEntry.getValue()); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/AbstractTimelineMetricsSeriesAggregateFunction.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/AbstractTimelineMetricsSeriesAggregateFunction.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/AbstractTimelineMetricsSeriesAggregateFunction.java new file mode 100644 index 0000000..1070242 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/AbstractTimelineMetricsSeriesAggregateFunction.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.function; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; + +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; + +import com.google.common.base.Joiner; + +public abstract class AbstractTimelineMetricsSeriesAggregateFunction + implements TimelineMetricsSeriesAggregateFunction { + + @Override + public TimelineMetric apply(TimelineMetrics timelineMetrics) { + Set<String> metricNameSet = new TreeSet<>(); + Set<String> hostNameSet = new TreeSet<>(); + Set<String> appIdSet = new TreeSet<>(); + Set<String> instanceIdSet = new TreeSet<>(); + TreeMap<Long, List<Double>> metricValues = new TreeMap<>(); + + for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) { + metricNameSet.add(timelineMetric.getMetricName()); + addToSetOnlyNotNull(hostNameSet, timelineMetric.getHostName()); + addToSetOnlyNotNull(appIdSet, timelineMetric.getAppId()); + addToSetOnlyNotNull(instanceIdSet, timelineMetric.getInstanceId()); + + for (Map.Entry<Long, Double> metricValue : timelineMetric.getMetricValues().entrySet()) { + Long timestamp = metricValue.getKey(); + Double value = metricValue.getValue(); + if (!metricValues.containsKey(timestamp)) { + metricValues.put(timestamp, new LinkedList<Double>()); + } + metricValues.get(timestamp).add(value); + } + } + + TreeMap<Long, Double> aggregatedMetricValues = new TreeMap<>(); + for (Map.Entry<Long, List<Double>> metricValue : metricValues.entrySet()) { + List<Double> values = metricValue.getValue(); + if (values.size() == 0) { + throw new IllegalArgumentException("count of values should be more than 0"); + } + aggregatedMetricValues.put(metricValue.getKey(), applyFunction(values)); + } + + TimelineMetric timelineMetric = new TimelineMetric(); + timelineMetric.setMetricName(getMetricName(metricNameSet.iterator())); + timelineMetric.setHostName(joinStringsWithComma(hostNameSet.iterator())); + timelineMetric.setAppId(joinStringsWithComma(appIdSet.iterator())); + timelineMetric.setInstanceId(joinStringsWithComma(instanceIdSet.iterator())); + if (aggregatedMetricValues.size() > 0) { + timelineMetric.setStartTime(aggregatedMetricValues.firstKey()); + } + timelineMetric.setMetricValues(aggregatedMetricValues); + return timelineMetric; + } + + protected String getMetricName(Iterator<String> metricNames) { + return getFunctionName() + "(" + Joiner.on(",").join(metricNames) + ")"; + } + + protected String joinStringsWithComma(Iterator<String> hostNames) { + return Joiner.on(",").join(hostNames); + } + + protected abstract Double applyFunction(List<Double> values); + protected abstract String getFunctionName(); + + private void addToSetOnlyNotNull(Set<String> set, String value) { + if (value != null) { + set.add(value); + } + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/SeriesAggregateFunction.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/SeriesAggregateFunction.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/SeriesAggregateFunction.java new file mode 100644 index 0000000..1438194 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/SeriesAggregateFunction.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.function; + +import org.apache.ambari.metrics.core.timeline.aggregators.Function; + +public enum SeriesAggregateFunction { + AVG, MIN, MAX, SUM; + + public static boolean isPresent(String functionName) { + try { + SeriesAggregateFunction.valueOf(functionName.toUpperCase()); + } catch (IllegalArgumentException e) { + return false; + } + return true; + } + + public static SeriesAggregateFunction getFunction(String functionName) throws Function.FunctionFormatException { + try { + return SeriesAggregateFunction.valueOf(functionName.toUpperCase()); + } catch (NullPointerException | IllegalArgumentException e) { + throw new Function.FunctionFormatException( + "Function should be sum, avg, min, max. Got " + functionName, e); + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesAggregateFunction.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesAggregateFunction.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesAggregateFunction.java new file mode 100644 index 0000000..a5f40e1 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesAggregateFunction.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.function; + +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; + +public interface TimelineMetricsSeriesAggregateFunction { + TimelineMetric apply(TimelineMetrics timelineMetrics); +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesAggregateFunctionFactory.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesAggregateFunctionFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesAggregateFunctionFactory.java new file mode 100644 index 0000000..f07aeb9 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesAggregateFunctionFactory.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.function; + +import org.apache.ambari.metrics.core.timeline.aggregators.Function; + +public class TimelineMetricsSeriesAggregateFunctionFactory { + private TimelineMetricsSeriesAggregateFunctionFactory() { + } + + public static TimelineMetricsSeriesAggregateFunction newInstance(SeriesAggregateFunction func) { + switch (func) { + case AVG: + return new TimelineMetricsSeriesAvgAggregateFunction(); + case MIN: + return new TimelineMetricsSeriesMinAggregateFunction(); + case MAX: + return new TimelineMetricsSeriesMaxAggregateFunction(); + case SUM: + return new TimelineMetricsSeriesSumAggregateFunction(); + default: + throw new Function.FunctionFormatException("Function should be sum, avg, min, max. Got " + + func.name()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesAvgAggregateFunction.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesAvgAggregateFunction.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesAvgAggregateFunction.java new file mode 100644 index 0000000..b0e3069 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesAvgAggregateFunction.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.function; + +import java.util.List; + +public class TimelineMetricsSeriesAvgAggregateFunction extends AbstractTimelineMetricsSeriesAggregateFunction { + private static final String FUNCTION_NAME = "AVG"; + + @Override + protected Double applyFunction(List<Double> values) { + double sum = 0.0d; + for (Double value : values) { + sum += value; + } + + return sum / values.size(); + } + + @Override + protected String getFunctionName() { + return FUNCTION_NAME; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesMaxAggregateFunction.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesMaxAggregateFunction.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesMaxAggregateFunction.java new file mode 100644 index 0000000..58c6d4c --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesMaxAggregateFunction.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.function; + +import java.util.List; + +public class TimelineMetricsSeriesMaxAggregateFunction extends AbstractTimelineMetricsSeriesAggregateFunction { + private static final String FUNCTION_NAME = "MAX"; + + @Override + protected Double applyFunction(List<Double> values) { + double max = Double.MIN_VALUE; + for (Double value : values) { + if (value > max) { + max = value; + } + } + + return max; + } + + @Override + protected String getFunctionName() { + return FUNCTION_NAME; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesMinAggregateFunction.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesMinAggregateFunction.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesMinAggregateFunction.java new file mode 100644 index 0000000..60e1bb5 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesMinAggregateFunction.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.function; + +import java.util.List; + +public class TimelineMetricsSeriesMinAggregateFunction extends AbstractTimelineMetricsSeriesAggregateFunction { + private static final String FUNCTION_NAME = "MIN"; + + @Override + protected Double applyFunction(List<Double> values) { + double min = Double.MAX_VALUE; + for (Double value : values) { + if (value < min) { + min = value; + } + } + + return min; + } + + @Override + protected String getFunctionName() { + return FUNCTION_NAME; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesSumAggregateFunction.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesSumAggregateFunction.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesSumAggregateFunction.java new file mode 100644 index 0000000..997d7be --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesSumAggregateFunction.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.function; + +import java.util.List; + +public class TimelineMetricsSeriesSumAggregateFunction extends AbstractTimelineMetricsSeriesAggregateFunction { + private static final String FUNCTION_NAME = "SUM"; + + @Override + protected Double applyFunction(List<Double> values) { + double sum = 0.0d; + for (Double value : values) { + sum += value; + } + + return sum; + } + + @Override + protected String getFunctionName() { + return FUNCTION_NAME; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/Condition.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/Condition.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/Condition.java new file mode 100644 index 0000000..fa118a3 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/Condition.java @@ -0,0 +1,51 @@ +package org.apache.ambari.metrics.core.timeline.query; + +import java.util.List; + +import org.apache.hadoop.metrics2.sink.timeline.Precision; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +public interface Condition { + boolean isEmpty(); + + List<byte[]> getUuids(); + List<String> getMetricNames(); + boolean isPointInTime(); + boolean isGrouped(); + void setStatement(String statement); + List<String> getHostnames(); + Precision getPrecision(); + void setPrecision(Precision precision); + String getAppId(); + String getInstanceId(); + StringBuilder getConditionClause(); + String getOrderByClause(boolean asc); + String getStatement(); + Long getStartTime(); + Long getEndTime(); + Integer getLimit(); + Integer getFetchSize(); + void setFetchSize(Integer fetchSize); + void addOrderByColumn(String column); + void setNoLimit(); + boolean doUpdate(); + void setMetricNamesNotCondition(boolean metricNamesNotCondition); + void setHostnamesNotCondition(boolean hostNamesNotCondition); + void setUuidNotCondition(boolean uuidNotCondition); +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/ConditionBuilder.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/ConditionBuilder.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/ConditionBuilder.java new file mode 100644 index 0000000..e779d77 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/ConditionBuilder.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.query; + +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.metrics2.sink.timeline.Precision; +import org.apache.ambari.metrics.core.timeline.aggregators.Function; + +public class ConditionBuilder { + + private List<String> metricNames; + private List<String> hostnames; + private String appId; + private String instanceId; + private Long startTime; + private Long endTime; + private Precision precision; + private Integer limit; + private boolean grouped; + private boolean noLimit = false; + private Integer fetchSize; + private String statement; + private Set<String> orderByColumns = new LinkedHashSet<String>(); + private Integer topN; + private boolean isBottomN; + private Function topNFunction; + private List<byte[]> uuids; + + public ConditionBuilder(List<String> metricNames) { + this.metricNames = metricNames; + } + + public ConditionBuilder hostnames(List<String> hostnames) { + this.hostnames = hostnames; + return this; + } + + public ConditionBuilder appId(String appId) { + this.appId = appId; + return this; + } + + public ConditionBuilder instanceId(String instanceId) { + this.instanceId = instanceId; + return this; + } + + public ConditionBuilder startTime(Long startTime) { + this.startTime = startTime; + return this; + } + + public ConditionBuilder endTime(Long endTime) { + this.endTime = endTime; + return this; + } + + public ConditionBuilder precision(Precision precision) { + this.precision = precision; + return this; + } + + public ConditionBuilder limit(Integer limit) { + this.limit = limit; + return this; + } + + public ConditionBuilder grouped(boolean grouped) { + this.grouped = grouped; + return this; + } + + public ConditionBuilder noLimit(boolean noLimit) { + this.noLimit = noLimit; + return this; + } + + public ConditionBuilder fetchSize(Integer fetchSize) { + this.fetchSize = fetchSize; + return this; + } + + public ConditionBuilder statement(String statement) { + this.statement = statement; + return this; + } + + public ConditionBuilder orderByColumns(Set<String> orderByColumns) { + this.orderByColumns = orderByColumns; + return this; + } + + public ConditionBuilder topN(Integer topN) { + this.topN = topN; + return this; + } + + public ConditionBuilder isBottomN(boolean isBottomN) { + this.isBottomN = isBottomN; + return this; + } + + public ConditionBuilder topNFunction(Function topNFunction) { + this.topNFunction = topNFunction; + return this; + } + + public ConditionBuilder uuid(List<byte[]> uuids) { + this.uuids = uuids; + return this; + } + + public Condition build() { + if (topN == null) { + return new DefaultCondition( + uuids, metricNames, + hostnames, appId, instanceId, startTime, endTime, + precision, limit, grouped); + } else { + return new TopNCondition(uuids, metricNames, hostnames, appId, instanceId, + startTime, endTime, precision, limit, grouped, topN, topNFunction, isBottomN); + } + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/ConnectionProvider.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/ConnectionProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/ConnectionProvider.java new file mode 100644 index 0000000..830a0eb --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/ConnectionProvider.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.query; + + +import java.sql.Connection; +import java.sql.SQLException; + +/** + * + */ +public interface ConnectionProvider { + public Connection getConnection() throws SQLException; +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/DefaultCondition.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/DefaultCondition.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/DefaultCondition.java new file mode 100644 index 0000000..888f381 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/DefaultCondition.java @@ -0,0 +1,421 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.query; + +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.Precision; +import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor; +import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager; + +public class DefaultCondition implements Condition { + List<String> metricNames; + List<String> hostnames; + String appId; + String instanceId; + Long startTime; + Long endTime; + Precision precision; + Integer limit; + boolean grouped; + boolean noLimit = false; + Integer fetchSize; + String statement; + Set<String> orderByColumns = new LinkedHashSet<String>(); + boolean metricNamesNotCondition = false; + boolean hostNamesNotCondition = false; + boolean uuidNotCondition = false; + List<byte[]> uuids = new ArrayList<>(); + + private static final Log LOG = LogFactory.getLog(DefaultCondition.class); + + public DefaultCondition(List<String> metricNames, List<String> hostnames, String appId, + String instanceId, Long startTime, Long endTime, Precision precision, + Integer limit, boolean grouped) { + this.metricNames = metricNames; + this.hostnames = hostnames; + this.appId = appId; + this.instanceId = instanceId; + this.startTime = startTime; + this.endTime = endTime; + this.precision = precision; + this.limit = limit; + this.grouped = grouped; + } + + public DefaultCondition(List<byte[]> uuids, List<String> metricNames, List<String> hostnames, String appId, + String instanceId, Long startTime, Long endTime, Precision precision, + Integer limit, boolean grouped) { + this.uuids = uuids; + this.metricNames = metricNames; + this.hostnames = hostnames; + this.appId = appId; + this.instanceId = instanceId; + this.startTime = startTime; + this.endTime = endTime; + this.precision = precision; + this.limit = limit; + this.grouped = grouped; + } + + public String getStatement() { + return statement; + } + + public void setStatement(String statement) { + this.statement = statement; + } + + public List<String> getMetricNames() { + return metricNames == null || metricNames.isEmpty() ? null : metricNames; + } + + public StringBuilder getConditionClause() { + StringBuilder sb = new StringBuilder(); + boolean appendConjunction = appendUuidClause(sb); + appendConjunction = append(sb, appendConjunction, getStartTime(), " SERVER_TIME >= ?"); + append(sb, appendConjunction, getEndTime(), " SERVER_TIME < ?"); + + return sb; + } + + protected static boolean append(StringBuilder sb, + boolean appendConjunction, + Object value, String str) { + if (value != null) { + if (appendConjunction) { + sb.append(" AND"); + } + + sb.append(str); + appendConjunction = true; + } + return appendConjunction; + } + + public List<String> getHostnames() { + return hostnames; + } + + public Precision getPrecision() { + return precision; + } + + public void setPrecision(Precision precision) { + this.precision = precision; + } + + public String getAppId() { + if (appId != null && !appId.isEmpty()) { + if (!(appId.equals("HOST") || appId.equals("FLUME_HANDLER"))) { + return appId.toLowerCase(); + } else { + return appId; + } + } + return null; + } + + public String getInstanceId() { + return instanceId == null || instanceId.isEmpty() ? null : instanceId; + } + + /** + * Convert to millis. + */ + public Long getStartTime() { + if (startTime == null) { + return null; + } else if (startTime < 9999999999l) { + return startTime * 1000; + } else { + return startTime; + } + } + + public Long getEndTime() { + if (endTime == null) { + return null; + } + if (endTime < 9999999999l) { + return endTime * 1000; + } else { + return endTime; + } + } + + public void setNoLimit() { + this.noLimit = true; + } + + @Override + public boolean doUpdate() { + return false; + } + + public Integer getLimit() { + if (noLimit) { + return null; + } + return limit == null ? PhoenixHBaseAccessor.RESULTSET_LIMIT : limit; + } + + public boolean isGrouped() { + return grouped; + } + + public boolean isPointInTime() { + return getStartTime() == null && getEndTime() == null; + } + + public boolean isEmpty() { + return (metricNames == null || metricNames.isEmpty()) + && (hostnames == null || hostnames.isEmpty()) + && (appId == null || appId.isEmpty()) + && (instanceId == null || instanceId.isEmpty()) + && startTime == null + && endTime == null; + } + + public Integer getFetchSize() { + return fetchSize; + } + + public void setFetchSize(Integer fetchSize) { + this.fetchSize = fetchSize; + } + + public void addOrderByColumn(String column) { + orderByColumns.add(column); + } + + public String getOrderByClause(boolean asc) { + String orderByStr = " ORDER BY "; + if (!orderByColumns.isEmpty()) { + StringBuilder sb = new StringBuilder(orderByStr); + for (String orderByColumn : orderByColumns) { + if (sb.length() != orderByStr.length()) { + sb.append(", "); + } + sb.append(orderByColumn); + if (!asc) { + sb.append(" DESC"); + } + } + sb.append(" "); + return sb.toString(); + } + return null; + } + + protected boolean appendUuidClause(StringBuilder sb) { + boolean appendConjunction = false; + + if (CollectionUtils.isNotEmpty(uuids)) { + + List<byte[]> uuidsHost = new ArrayList<>(); + List<byte[]> uuidsMetric = new ArrayList<>(); + List<byte[]> uuidsFull = new ArrayList<>(); + + if (getUuids() != null) { + for (byte[] uuid : uuids) { + if (uuid.length == TimelineMetricMetadataManager.TIMELINE_METRIC_UUID_LENGTH) { + uuidsMetric.add(uuid); + } else if (uuid.length == TimelineMetricMetadataManager.HOSTNAME_UUID_LENGTH) { + uuidsHost.add(uuid); + } else { + uuidsFull.add(uuid); + } + } + + // Put a '(' first + sb.append("("); + + //IN clause + // METRIC_NAME (NOT) IN (?,?,?,?) + if (CollectionUtils.isNotEmpty(uuidsFull)) { + sb.append("UUID"); + if (uuidNotCondition) { + sb.append(" NOT"); + } + sb.append(" IN ("); + //Append ?,?,?,? + for (int i = 0; i < uuidsFull.size(); i++) { + sb.append("?"); + if (i < uuidsFull.size() - 1) { + sb.append(", "); + } + } + sb.append(")"); + appendConjunction = true; + } + + //Put an AND if both types are present + if (CollectionUtils.isNotEmpty(uuidsFull) && + CollectionUtils.isNotEmpty(uuidsMetric)) { + sb.append(" AND "); + } + + // ( for OR + if (!metricNamesNotCondition && uuidsMetric.size() > 1 && (CollectionUtils.isNotEmpty(uuidsFull) || CollectionUtils.isNotEmpty(uuidsHost))) { + sb.append("("); + } + + //LIKE clause for clusterMetric UUIDs + // UUID (NOT) LIKE ? OR(AND) UUID LIKE ? + if (CollectionUtils.isNotEmpty(uuidsMetric)) { + + for (int i = 0; i < uuidsMetric.size(); i++) { + sb.append("UUID"); + if (metricNamesNotCondition) { + sb.append(" NOT"); + } + sb.append(" LIKE "); + sb.append("?"); + + if (i < uuidsMetric.size() - 1) { + if (metricNamesNotCondition) { + sb.append(" AND "); + } else { + sb.append(" OR "); + } + // ) for OR + } else if ((CollectionUtils.isNotEmpty(uuidsFull) || CollectionUtils.isNotEmpty(uuidsHost)) && !metricNamesNotCondition && uuidsMetric.size() > 1) { + sb.append(")"); + } + } + appendConjunction = true; + } + + //Put an AND if both types are present + if ((CollectionUtils.isNotEmpty(uuidsMetric) || (CollectionUtils.isNotEmpty(uuidsFull) && CollectionUtils.isEmpty(uuidsMetric))) + && CollectionUtils.isNotEmpty(uuidsHost)) { + sb.append(" AND "); + } + // ( for OR + if((CollectionUtils.isNotEmpty(uuidsFull) || CollectionUtils.isNotEmpty(uuidsMetric)) && !hostNamesNotCondition && uuidsHost.size() > 1){ + sb.append("("); + } + + //LIKE clause for HOST UUIDs + // UUID (NOT) LIKE ? OR(AND) UUID LIKE ? + if (CollectionUtils.isNotEmpty(uuidsHost)) { + + for (int i = 0; i < uuidsHost.size(); i++) { + sb.append("UUID"); + if (hostNamesNotCondition) { + sb.append(" NOT"); + } + sb.append(" LIKE "); + sb.append("?"); + + if (i < uuidsHost.size() - 1) { + if (hostNamesNotCondition) { + sb.append(" AND "); + } else { + sb.append(" OR "); + } + // ) for OR + } else if ((CollectionUtils.isNotEmpty(uuidsFull) || CollectionUtils.isNotEmpty(uuidsMetric)) && !hostNamesNotCondition && uuidsHost.size() > 1) { + sb.append(")"); + } + } + appendConjunction = true; + } + + // Finish with a ')' + if (appendConjunction) { + sb.append(")"); + } + + uuids = new ArrayList<>(); + if (CollectionUtils.isNotEmpty(uuidsFull)) { + uuids.addAll(uuidsFull); + } + for (byte[] uuid: uuidsMetric) { + uuids.add(new String(uuid).concat("%").getBytes()); + } + for (byte[] uuid: uuidsHost) { + uuids.add("%".concat(new String(uuid)).getBytes()); + } + } + } + + return appendConjunction; + } + + @Override + public String toString() { + return "Condition{" + + "uuids=" + uuids + + ", appId='" + appId + '\'' + + ", instanceId='" + instanceId + '\'' + + ", startTime=" + startTime + + ", endTime=" + endTime + + ", limit=" + limit + + ", grouped=" + grouped + + ", orderBy=" + orderByColumns + + ", noLimit=" + noLimit + + '}'; + } + + protected static boolean metricNamesHaveWildcard(List<String> metricNames) { + for (String name : metricNames) { + if (name.contains("%")) { + return true; + } + } + return false; + } + + protected static boolean hostNamesHaveWildcard(List<String> hostnames) { + if (hostnames == null) + return false; + for (String name : hostnames) { + if (name.contains("%")) { + return true; + } + } + return false; + } + + public void setMetricNamesNotCondition(boolean metricNamesNotCondition) { + this.metricNamesNotCondition = metricNamesNotCondition; + } + + @Override + public void setHostnamesNotCondition(boolean hostNamesNotCondition) { + this.hostNamesNotCondition = hostNamesNotCondition; + } + + @Override + public void setUuidNotCondition(boolean uuidNotCondition) { + this.uuidNotCondition = uuidNotCondition; + } + + @Override + public List<byte[]> getUuids() { + return uuids; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/DefaultPhoenixDataSource.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/DefaultPhoenixDataSource.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/DefaultPhoenixDataSource.java new file mode 100644 index 0000000..5c0a4b5 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/DefaultPhoenixDataSource.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.query; + + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.HBaseAdmin; + +public class DefaultPhoenixDataSource implements PhoenixConnectionProvider { + + static final Log LOG = LogFactory.getLog(DefaultPhoenixDataSource.class); + private static final String ZOOKEEPER_CLIENT_PORT = "hbase.zookeeper.property.clientPort"; + private static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"; + private static final String ZNODE_PARENT = "zookeeper.znode.parent"; + + private static final String connectionUrl = "jdbc:phoenix:%s:%s:%s"; + private final String url; + + private Configuration hbaseConf; + + public DefaultPhoenixDataSource(Configuration hbaseConf) { + this.hbaseConf = hbaseConf; + String zookeeperClientPort = hbaseConf.getTrimmed(ZOOKEEPER_CLIENT_PORT, "2181"); + String zookeeperQuorum = hbaseConf.getTrimmed(ZOOKEEPER_QUORUM); + String znodeParent = hbaseConf.getTrimmed(ZNODE_PARENT, "/ams-hbase-unsecure"); + if (zookeeperQuorum == null || zookeeperQuorum.isEmpty()) { + throw new IllegalStateException("Unable to find Zookeeper quorum to " + + "access HBase store using Phoenix."); + } + + url = String.format(connectionUrl, + zookeeperQuorum, + zookeeperClientPort, + znodeParent); + } + + /** + * Get HBaseAdmin for table ops. + * @return @HBaseAdmin + * @throws IOException + */ + public HBaseAdmin getHBaseAdmin() throws IOException { + return (HBaseAdmin) ConnectionFactory.createConnection(hbaseConf).getAdmin(); + } + + /** + * Get JDBC connection to HBase store. Assumption is that the hbase + * configuration is present on the classpath and loaded by the caller into + * the Configuration object. + * Phoenix already caches the HConnection between the client and HBase + * cluster. + * + * @return @java.sql.Connection + */ + public Connection getConnection() throws SQLException { + + LOG.debug("Metric store connection url: " + url); + try { + return DriverManager.getConnection(url); + } catch (SQLException e) { + LOG.warn("Unable to connect to HBase store using Phoenix.", e); + + throw e; + } + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/EmptyCondition.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/EmptyCondition.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/EmptyCondition.java new file mode 100644 index 0000000..742b09b --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/EmptyCondition.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.query; + +import java.util.List; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.metrics2.sink.timeline.Precision; + +/** + * Encapsulate a Condition with pre-formatted and pre-parsed query string. + */ +public class EmptyCondition implements Condition { + String statement; + boolean doUpdate = false; + boolean metricNamesNotCondition = false; + + @Override + public boolean isEmpty() { + return false; + } + + @Override + public List<byte[]> getUuids() { + return null; + } + + @Override + public List<String> getMetricNames() { + return null; + } + + @Override + public boolean isPointInTime() { + return false; + } + + @Override + public boolean isGrouped() { + return true; + } + + @Override + public void setStatement(String statement) { + this.statement = statement; + } + + @Override + public List<String> getHostnames() { + return null; + } + + @Override + public Precision getPrecision() { + return null; + } + + @Override + public void setPrecision(Precision precision) { + + } + + @Override + public String getAppId() { + return null; + } + + @Override + public String getInstanceId() { + return null; + } + + @Override + public StringBuilder getConditionClause() { + return null; + } + + @Override + public String getOrderByClause(boolean asc) { + return null; + } + + @Override + public String getStatement() { + return statement; + } + + @Override + public Long getStartTime() { + return null; + } + + @Override + public Long getEndTime() { + return null; + } + + @Override + public Integer getLimit() { + return null; + } + + @Override + public Integer getFetchSize() { + return null; + } + + @Override + public void setFetchSize(Integer fetchSize) { + + } + + @Override + public void addOrderByColumn(String column) { + + } + + @Override + public void setNoLimit() { + + } + + public void setDoUpdate(boolean doUpdate) { + this.doUpdate = doUpdate; + } + + @Override + public boolean doUpdate() { + return doUpdate; + } + + @Override + public String toString() { + return "EmptyCondition{ " + + " statement = " + this.getStatement() + + " doUpdate = " + this.doUpdate() + + " }"; + } + + @Override + public void setMetricNamesNotCondition(boolean metricNamesNotCondition) { + this.metricNamesNotCondition = metricNamesNotCondition; + } + + @Override + public void setHostnamesNotCondition(boolean hostNamesNotCondition) { + throw new NotImplementedException("Not implemented"); + } + + @Override + public void setUuidNotCondition(boolean uuidNotCondition) { + throw new NotImplementedException("Not implemented"); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixConnectionProvider.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixConnectionProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixConnectionProvider.java new file mode 100644 index 0000000..c8f958e --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixConnectionProvider.java @@ -0,0 +1,31 @@ +package org.apache.ambari.metrics.core.timeline.query; + +import java.io.IOException; + +import org.apache.hadoop.hbase.client.Admin; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +public interface PhoenixConnectionProvider extends ConnectionProvider { + /** + * Get HBaseAdmin for the Phoenix connection + * @return + * @throws IOException + */ + Admin getHBaseAdmin() throws IOException; +}
