http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataManager.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataManager.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataManager.java
new file mode 100644
index 0000000..97eb7b1
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataManager.java
@@ -0,0 +1,606 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.discovery;
+
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration;
+import org.apache.ambari.metrics.core.timeline.uuid.MetricUuidGenStrategy;
+import org.apache.ambari.metrics.core.timeline.uuid.RandomUuidGenStrategy;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetadataException;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
+import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
+import 
org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric;
+import org.apache.ambari.metrics.core.timeline.uuid.HashBasedUuidGenStrategy;
+
+public class TimelineMetricMetadataManager {
+  private static final Log LOG = 
LogFactory.getLog(TimelineMetricMetadataManager.class);
+  private boolean isDisabled = false;
+  // Cache all metadata on retrieval
+  private final Map<TimelineMetricMetadataKey, TimelineMetricMetadata> 
METADATA_CACHE = new ConcurrentHashMap<>();
+  private final Map<String, TimelineMetricMetadataKey> uuidKeyMap = new 
ConcurrentHashMap<>();
+  // Map to lookup apps on a host
+  private final Map<String, TimelineMetricHostMetadata> HOSTED_APPS_MAP = new 
ConcurrentHashMap<>();
+  private final Map<String, String> uuidHostMap = new ConcurrentHashMap<>();
+  private final Map<String, Set<String>> INSTANCE_HOST_MAP = new 
ConcurrentHashMap<>();
+  // Sync only when needed
+  AtomicBoolean SYNC_HOSTED_APPS_METADATA = new AtomicBoolean(false);
+  AtomicBoolean SYNC_HOSTED_INSTANCES_METADATA = new AtomicBoolean(false);
+  private MetricUuidGenStrategy uuidGenStrategy = new 
HashBasedUuidGenStrategy();
+  public static final int TIMELINE_METRIC_UUID_LENGTH = 16;
+  public static final int HOSTNAME_UUID_LENGTH = 4;
+
+  // Single thread to sync back new writes to the store
+  private final ScheduledExecutorService executorService = 
Executors.newSingleThreadScheduledExecutor();
+
+  private PhoenixHBaseAccessor hBaseAccessor;
+  private Configuration metricsConf;
+
+  TimelineMetricMetadataSync metricMetadataSync;
+  // Filter metrics names matching given patterns, from metadata
+  final List<String> metricNameFilters = new ArrayList<>();
+
+  // Test friendly construction since mock instrumentation is difficult to get
+  // working with hadoop mini cluster
+  public TimelineMetricMetadataManager(Configuration metricsConf, 
PhoenixHBaseAccessor hBaseAccessor) {
+    this.metricsConf = metricsConf;
+    this.hBaseAccessor = hBaseAccessor;
+    String patternStrings = 
metricsConf.get(TimelineMetricConfiguration.TIMELINE_METRIC_METADATA_FILTERS);
+    if (!StringUtils.isEmpty(patternStrings)) {
+      metricNameFilters.addAll(Arrays.asList(patternStrings.split(",")));
+    }
+
+    uuidGenStrategy = getUuidStrategy(metricsConf);
+  }
+
+  public TimelineMetricMetadataManager(PhoenixHBaseAccessor hBaseAccessor) 
throws MalformedURLException, URISyntaxException {
+    this(TimelineMetricConfiguration.getInstance().getMetricsConf(), 
hBaseAccessor);
+  }
+
+  /**
+   * Initialize Metadata from the store
+   */
+  public void initializeMetadata() {
+    if 
(metricsConf.getBoolean(TimelineMetricConfiguration.DISABLE_METRIC_METADATA_MGMT,
 false)) {
+      isDisabled = true;
+    } else {
+      metricMetadataSync = new TimelineMetricMetadataSync(this);
+      // Schedule the executor to sync to store
+      executorService.scheduleWithFixedDelay(metricMetadataSync,
+        
metricsConf.getInt(TimelineMetricConfiguration.METRICS_METADATA_SYNC_INIT_DELAY,
 120), // 2 minutes
+        
metricsConf.getInt(TimelineMetricConfiguration.METRICS_METADATA_SYNC_SCHEDULE_DELAY,
 300), // 5 minutes
+        TimeUnit.SECONDS);
+      // Read from store and initialize map
+      try {
+        Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadata = 
getMetadataFromStore();
+
+        LOG.info("Retrieved " + metadata.size() + ", metadata objects from 
store.");
+        // Store in the cache
+        METADATA_CACHE.putAll(metadata);
+
+        Map<String, TimelineMetricHostMetadata> hostedAppData = 
getHostedAppsFromStore();
+
+        LOG.info("Retrieved " + hostedAppData.size() + " host objects from 
store.");
+        HOSTED_APPS_MAP.putAll(hostedAppData);
+
+        loadUuidMapsOnInit();
+
+        hBaseAccessor.setMetadataInstance(this);
+      } catch (SQLException e) {
+        LOG.warn("Exception loading metric metadata", e);
+      }
+    }
+  }
+
+  public Map<TimelineMetricMetadataKey, TimelineMetricMetadata> 
getMetadataCache() {
+    return METADATA_CACHE;
+  }
+
+  public TimelineMetricMetadata 
getMetadataCacheValue(TimelineMetricMetadataKey key) {
+    return METADATA_CACHE.get(key);
+  }
+
+  public Map<String, TimelineMetricHostMetadata> getHostedAppsCache() {
+    return HOSTED_APPS_MAP;
+  }
+
+  public Map<String, Set<String>> getHostedInstanceCache() {
+    return INSTANCE_HOST_MAP;
+  }
+
+  public boolean syncHostedAppsMetadata() {
+    return SYNC_HOSTED_APPS_METADATA.get();
+  }
+
+  public boolean syncHostedInstanceMetadata() {
+    return SYNC_HOSTED_INSTANCES_METADATA.get();
+  }
+
+  public void markSuccessOnSyncHostedAppsMetadata() {
+    SYNC_HOSTED_APPS_METADATA.set(false);
+  }
+
+  public void markSuccessOnSyncHostedInstanceMetadata() {
+    SYNC_HOSTED_INSTANCES_METADATA.set(false);
+  }
+  /**
+   * Test metric name for valid patterns and return true/false
+   */
+  boolean skipMetadataCache(String metricName) {
+    for (String pattern : metricNameFilters) {
+      if (metricName.contains(pattern)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Update value in metadata cache
+   * @param metadata @TimelineMetricMetadata
+   */
+  public void putIfModifiedTimelineMetricMetadata(TimelineMetricMetadata 
metadata) {
+    if (skipMetadataCache(metadata.getMetricName())) {
+      return;
+    }
+
+    TimelineMetricMetadataKey key = new TimelineMetricMetadataKey(
+      metadata.getMetricName(), metadata.getAppId(), metadata.getInstanceId());
+
+    TimelineMetricMetadata metadataFromCache = METADATA_CACHE.get(key);
+
+    if (metadataFromCache != null) {
+      try {
+        if (metadataFromCache.needsToBeSynced(metadata)) {
+          metadata.setIsPersisted(false); // Set the flag to ensure sync to 
store on next run
+          METADATA_CACHE.put(key, metadata);
+        }
+      } catch (MetadataException e) {
+        LOG.warn("Error inserting Metadata in cache.", e);
+      }
+
+    } else {
+      METADATA_CACHE.put(key, metadata);
+    }
+  }
+
+  /**
+   * Update value in hosted apps cache
+   * @param hostname Host name
+   * @param appId Application Id
+   */
+  public void putIfModifiedHostedAppsMetadata(String hostname, String appId) {
+    TimelineMetricHostMetadata timelineMetricHostMetadata = 
HOSTED_APPS_MAP.get(hostname);
+    ConcurrentHashMap<String, String> apps = (timelineMetricHostMetadata != 
null) ? timelineMetricHostMetadata.getHostedApps() : null;
+    if (apps == null) {
+      apps = new ConcurrentHashMap<>();
+      if (timelineMetricHostMetadata == null) {
+        HOSTED_APPS_MAP.put(hostname, new TimelineMetricHostMetadata(apps));
+      } else {
+        HOSTED_APPS_MAP.get(hostname).setHostedApps(apps);
+      }
+    }
+
+    if (!apps.containsKey(appId)) {
+      apps.put(appId, appId);
+      SYNC_HOSTED_APPS_METADATA.set(true);
+    }
+  }
+
+  public void putIfModifiedHostedInstanceMetadata(String instanceId, String 
hostname) {
+    if (StringUtils.isEmpty(instanceId)) {
+      return;
+    }
+
+    Set<String> hosts = INSTANCE_HOST_MAP.get(instanceId);
+    if (hosts == null) {
+      hosts = new HashSet<>();
+      INSTANCE_HOST_MAP.put(instanceId, hosts);
+    }
+
+    if (!hosts.contains(hostname)) {
+      hosts.add(hostname);
+      SYNC_HOSTED_INSTANCES_METADATA.set(true);
+    }
+  }
+
+  public void persistMetadata(Collection<TimelineMetricMetadata> metadata) 
throws SQLException {
+    hBaseAccessor.saveMetricMetadata(metadata);
+  }
+
+  public void persistHostedAppsMetadata(Map<String, 
TimelineMetricHostMetadata> hostedApps) throws SQLException {
+    hBaseAccessor.saveHostAppsMetadata(hostedApps);
+  }
+
+  public void persistHostedInstanceMetadata(Map<String, Set<String>> 
hostedInstancesMetadata) throws SQLException {
+    hBaseAccessor.saveInstanceHostsMetadata(hostedInstancesMetadata);
+  }
+
+  public TimelineMetricMetadata getTimelineMetricMetadata(TimelineMetric 
timelineMetric, boolean isWhitelisted) {
+    return new TimelineMetricMetadata(
+      timelineMetric.getMetricName(),
+      timelineMetric.getAppId(),
+      timelineMetric.getInstanceId(),
+      timelineMetric.getUnits(),
+      timelineMetric.getType(),
+      timelineMetric.getStartTime(),
+      supportAggregates(timelineMetric),
+      isWhitelisted
+    );
+  }
+
+  public boolean isDisabled() {
+    return isDisabled;
+  }
+
+  boolean isDistributedModeEnabled() {
+    return 
metricsConf.get("timeline.metrics.service.operation.mode").equals("distributed");
+  }
+
+  /**
+   * Fetch metrics metadata from store
+   * @throws SQLException
+   */
+  Map<TimelineMetricMetadataKey, TimelineMetricMetadata> 
getMetadataFromStore() throws SQLException {
+    return hBaseAccessor.getTimelineMetricMetadata();
+  }
+
+  /**
+   * Fetch hosted apps from store
+   * @throws SQLException
+   */
+  Map<String, TimelineMetricHostMetadata> getHostedAppsFromStore() throws 
SQLException {
+    return hBaseAccessor.getHostedAppsMetadata();
+  }
+
+  Map<String, Set<String>> getHostedInstancesFromStore() throws SQLException {
+    return hBaseAccessor.getInstanceHostsMetdata();
+  }
+
+  private boolean supportAggregates(TimelineMetric metric) {
+    return MapUtils.isEmpty(metric.getMetadata()) ||
+      
!(String.valueOf(true).equals(metric.getMetadata().get("skipAggregation")));
+  }
+
+  
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+  // UUID Management
+  
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+
+  /**
+   * Load the UUID mappings from the UUID table on startup.
+   */
+  private void loadUuidMapsOnInit() {
+
+    for (TimelineMetricMetadataKey key : METADATA_CACHE.keySet()) {
+      TimelineMetricMetadata timelineMetricMetadata = METADATA_CACHE.get(key);
+      if (timelineMetricMetadata != null && timelineMetricMetadata.getUuid() 
!= null) {
+        uuidKeyMap.put(new String(timelineMetricMetadata.getUuid()), key);
+      }
+    }
+
+    for (String host : HOSTED_APPS_MAP.keySet()) {
+      TimelineMetricHostMetadata timelineMetricHostMetadata = 
HOSTED_APPS_MAP.get(host);
+      if (timelineMetricHostMetadata != null && 
timelineMetricHostMetadata.getUuid() != null) {
+        uuidHostMap.put(new String(timelineMetricHostMetadata.getUuid()), 
host);
+      }
+    }
+  }
+
+  /**
+   * Returns the UUID gen strategy.
+   * @param configuration
+   * @return
+   */
+  private MetricUuidGenStrategy getUuidStrategy(Configuration configuration) {
+    String strategy = 
configuration.get(TimelineMetricConfiguration.TIMELINE_METRICS_UUID_GEN_STRATEGY,
 "");
+    if ("random".equalsIgnoreCase(strategy)) {
+      return new RandomUuidGenStrategy();
+    } else {
+      return new HashBasedUuidGenStrategy();
+    }
+  }
+
+  /**
+   * Given the hostname, generates a byte array of length 
'HOSTNAME_UUID_LENGTH'
+   * @param hostname
+   * @return uuid byte array of length 'HOSTNAME_UUID_LENGTH'
+   */
+  private byte[] getUuidForHostname(String hostname) {
+
+    TimelineMetricHostMetadata timelineMetricHostMetadata = 
HOSTED_APPS_MAP.get(hostname);
+    if (timelineMetricHostMetadata != null) {
+      byte[] uuid = timelineMetricHostMetadata.getUuid();
+      if (uuid != null) {
+        return uuid;
+      }
+    }
+
+    byte[] uuid = uuidGenStrategy.computeUuid(hostname, HOSTNAME_UUID_LENGTH);
+
+    String uuidStr = new String(uuid);
+    if (uuidHostMap.containsKey(uuidStr)) {
+      //TODO fix the collisions
+      LOG.error("Duplicate key computed for " + hostname +", Collides with  " 
+ uuidHostMap.get(uuidStr));
+      return uuid;
+    }
+
+    if (timelineMetricHostMetadata == null) {
+      timelineMetricHostMetadata = new TimelineMetricHostMetadata();
+      HOSTED_APPS_MAP.put(hostname, timelineMetricHostMetadata);
+    }
+    timelineMetricHostMetadata.setUuid(uuid);
+    uuidHostMap.put(uuidStr, hostname);
+
+    return uuid;
+  }
+
+  /**
+   * Given a timelineClusterMetric instance, generates a UUID for 
Metric-App-Instance combination.
+   * @param timelineClusterMetric
+   * @return uuid byte array of length 'TIMELINE_METRIC_UUID_LENGTH'
+   */
+  public byte[] getUuid(TimelineClusterMetric timelineClusterMetric) {
+    TimelineMetricMetadataKey key = new 
TimelineMetricMetadataKey(timelineClusterMetric.getMetricName(),
+      timelineClusterMetric.getAppId(), timelineClusterMetric.getInstanceId());
+
+    TimelineMetricMetadata timelineMetricMetadata = METADATA_CACHE.get(key);
+    if (timelineMetricMetadata != null) {
+      byte[] uuid = timelineMetricMetadata.getUuid();
+      if (uuid != null) {
+        return uuid;
+      }
+    }
+
+    byte[] uuid = uuidGenStrategy.computeUuid(timelineClusterMetric, 
TIMELINE_METRIC_UUID_LENGTH);
+
+    String uuidStr = new String(uuid);
+    if (uuidKeyMap.containsKey(uuidStr) && 
!uuidKeyMap.get(uuidStr).equals(key)) {
+      TimelineMetricMetadataKey collidingKey = 
(TimelineMetricMetadataKey)uuidKeyMap.get(uuidStr);
+      //TODO fix the collisions
+      /**
+       * 2017-08-23 14:12:35,922 ERROR TimelineMetricMetadataManager:
+       * Duplicate key [52, 50, 51, 53, 50, 53, 53, 53, 49, 54, 57, 50, 50, 
54, 0, 0]([B@278a93f9) computed for
+       * TimelineClusterMetric{metricName='sdisk_dm-11_write_count', 
appId='hbase', instanceId='', timestamp=1503497400000}, Collides with
+       * TimelineMetricMetadataKey{metricName='sdisk_dm-20_write_count', 
appId='hbase', instanceId=''}
+       */
+      LOG.error("Duplicate key " + Arrays.toString(uuid) + "(" + uuid +  ") 
computed for " + timelineClusterMetric.toString() + ", Collides with  " + 
collidingKey.toString());
+      return uuid;
+    }
+
+    if (timelineMetricMetadata == null) {
+      timelineMetricMetadata = new TimelineMetricMetadata();
+      
timelineMetricMetadata.setMetricName(timelineClusterMetric.getMetricName());
+      timelineMetricMetadata.setAppId(timelineClusterMetric.getAppId());
+      
timelineMetricMetadata.setInstanceId(timelineClusterMetric.getInstanceId());
+      METADATA_CACHE.put(key, timelineMetricMetadata);
+    }
+
+    timelineMetricMetadata.setUuid(uuid);
+    timelineMetricMetadata.setIsPersisted(false);
+    uuidKeyMap.put(uuidStr, key);
+    return uuid;
+  }
+
+  /**
+   * Given a timelineMetric instance, generates a UUID for Metric-App-Instance 
combination.
+   * @param timelineMetric
+   * @return uuid byte array of length 'TIMELINE_METRIC_UUID_LENGTH' + 
'HOSTNAME_UUID_LENGTH'
+   */
+  public byte[] getUuid(TimelineMetric timelineMetric) {
+
+    byte[] metricUuid = getUuid(new 
TimelineClusterMetric(timelineMetric.getMetricName(), timelineMetric.getAppId(),
+      timelineMetric.getInstanceId(), -1l));
+    byte[] hostUuid = getUuidForHostname(timelineMetric.getHostName());
+
+    return ArrayUtils.addAll(metricUuid, hostUuid);
+  }
+
+  public byte[] getUuid(String metricName, String appId, String instanceId, 
String hostname) {
+
+    byte[] metricUuid = getUuid(new TimelineClusterMetric(metricName, appId, 
instanceId, -1l));
+    if (StringUtils.isNotEmpty(hostname)) {
+      byte[] hostUuid = getUuidForHostname(hostname);
+      return ArrayUtils.addAll(metricUuid, hostUuid);
+    }
+    return metricUuid;
+  }
+
+  public String getMetricNameFromUuid(byte[]  uuid) {
+
+    byte[] metricUuid = uuid;
+    if (uuid.length == TIMELINE_METRIC_UUID_LENGTH + HOSTNAME_UUID_LENGTH) {
+      metricUuid = ArrayUtils.subarray(uuid, 0, TIMELINE_METRIC_UUID_LENGTH);
+    }
+
+    TimelineMetricMetadataKey key = uuidKeyMap.get(new String(metricUuid));
+    return key != null ? key.getMetricName() : null;
+  }
+
+  public TimelineMetric getMetricFromUuid(byte[] uuid) {
+    if (uuid == null) {
+      return null;
+    }
+
+    if (uuid.length == TIMELINE_METRIC_UUID_LENGTH) {
+      TimelineMetricMetadataKey key = uuidKeyMap.get(new String(uuid));
+      return key != null ? new TimelineMetric(key.metricName, null, key.appId, 
key.instanceId) : null;
+    } else {
+      byte[] metricUuid = ArrayUtils.subarray(uuid, 0, 
TIMELINE_METRIC_UUID_LENGTH);
+      TimelineMetricMetadataKey key = uuidKeyMap.get(new String(metricUuid));
+      if (key == null) {
+        LOG.error("TimelineMetricMetadataKey is null for : " + 
Arrays.toString(uuid));
+        return null;
+      }
+      TimelineMetric timelineMetric = new TimelineMetric();
+      timelineMetric.setMetricName(key.metricName);
+      timelineMetric.setAppId(key.appId);
+      timelineMetric.setInstanceId(key.instanceId);
+
+      byte[] hostUuid = ArrayUtils.subarray(uuid, TIMELINE_METRIC_UUID_LENGTH, 
HOSTNAME_UUID_LENGTH + TIMELINE_METRIC_UUID_LENGTH);
+      timelineMetric.setHostName(uuidHostMap.get(new String(hostUuid)));
+      return timelineMetric;
+    }
+  }
+
+  /**
+   * Returns the set of UUIDs for a given GET request. If there are wildcards 
(%), resolves them based on UUID map.
+   * @param metricNames
+   * @param hostnames
+   * @param appId
+   * @param instanceId
+   * @return Set of UUIds
+   */
+  public List<byte[]> getUuids(Collection<String> metricNames, List<String> 
hostnames, String appId, String instanceId) {
+
+    Collection<String> sanitizedMetricNames = new HashSet<>();
+
+    for (String metricName : metricNames) {
+      if (metricName.contains("%")) {
+        String metricRegEx;
+        //Special case handling for metric name with * and __%.
+        //For example, dfs.NNTopUserOpCounts.windowMs=300000.op=*.user=%.count
+        // or dfs.NNTopUserOpCounts.windowMs=300000.op=__%.user=%.count
+        if (metricName.contains("*") || metricName.contains("__%")) {
+          String metricNameWithEscSeq = metricName.replace("*", 
"\\*").replace("__%", "..%");
+          metricRegEx = metricNameWithEscSeq.replace("%", ".*");
+        } else {
+          metricRegEx = metricName.replace("%", ".*");
+        }
+        for (TimelineMetricMetadataKey key : METADATA_CACHE.keySet()) {
+          String metricNameFromMetadata = key.getMetricName();
+          if (metricNameFromMetadata.matches(metricRegEx)) {
+            sanitizedMetricNames.add(metricNameFromMetadata);
+          }
+        }
+      } else {
+        sanitizedMetricNames.add(metricName);
+      }
+    }
+
+    Set<String> sanitizedHostNames = new HashSet<>();
+    if (CollectionUtils.isNotEmpty(hostnames)) {
+      for (String hostname : hostnames) {
+        if (hostname.contains("%")) {
+          String hostRegEx;
+          hostRegEx = hostname.replace("%", ".*");
+          for (String host : HOSTED_APPS_MAP.keySet()) {
+            if (host.matches(hostRegEx)) {
+              sanitizedHostNames.add(host);
+            }
+          }
+        } else {
+          sanitizedHostNames.add(hostname);
+        }
+      }
+    }
+
+    List<byte[]> uuids = new ArrayList<>();
+
+    if ( StringUtils.isNotEmpty(appId) && !(appId.equals("HOST") || 
appId.equals("FLUME_HANDLER"))) { //HACK.. Why??
+      appId = appId.toLowerCase();
+    }
+    if (CollectionUtils.isNotEmpty(sanitizedHostNames)) {
+      if (CollectionUtils.isNotEmpty(sanitizedMetricNames)) {
+        for (String metricName : sanitizedMetricNames) {
+          TimelineMetric metric = new TimelineMetric();
+          metric.setMetricName(metricName);
+          metric.setAppId(appId);
+          metric.setInstanceId(instanceId);
+          for (String hostname : sanitizedHostNames) {
+            metric.setHostName(hostname);
+            byte[] uuid = getUuid(metric);
+            if (uuid != null) {
+              uuids.add(uuid);
+            }
+          }
+        }
+      } else {
+        for (String hostname : sanitizedHostNames) {
+          byte[] uuid = getUuidForHostname(hostname);
+          if (uuid != null) {
+            uuids.add(uuid);
+          }
+        }
+      }
+    } else {
+      for (String metricName : sanitizedMetricNames) {
+        TimelineClusterMetric metric = new TimelineClusterMetric(metricName, 
appId, instanceId, -1l);
+        byte[] uuid = getUuid(metric);
+        if (uuid != null) {
+          uuids.add(uuid);
+        }
+      }
+    }
+
+    return uuids;
+  }
+
+  public Map<String, TimelineMetricMetadataKey> getUuidKeyMap() {
+    return uuidKeyMap;
+  }
+
+  public List<String> getNotLikeHostnames(List<String> hostnames) {
+    List<String> result = new ArrayList<>();
+    Set<String> sanitizedHostNames = new HashSet<>();
+    if (CollectionUtils.isNotEmpty(hostnames)) {
+      for (String hostname : hostnames) {
+        if (hostname.contains("%")) {
+          String hostRegEx;
+          hostRegEx = hostname.replace("%", ".*");
+          for (String host : HOSTED_APPS_MAP.keySet()) {
+            if (host.matches(hostRegEx)) {
+              sanitizedHostNames.add(host);
+            }
+          }
+        } else {
+          sanitizedHostNames.add(hostname);
+        }
+      }
+    }
+
+    for (String hostname: HOSTED_APPS_MAP.keySet()) {
+      if (!sanitizedHostNames.contains(hostname)) {
+        result.add(hostname);
+      }
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataSync.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataSync.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataSync.java
new file mode 100644
index 0000000..0ffff76
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataSync.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.discovery;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
+
+/**
+ * Sync metadata info with the store
+ */
+public class TimelineMetricMetadataSync implements Runnable {
+  private static final Log LOG = 
LogFactory.getLog(TimelineMetricMetadataSync.class);
+
+  private final TimelineMetricMetadataManager cacheManager;
+
+  public TimelineMetricMetadataSync(TimelineMetricMetadataManager 
cacheManager) {
+    this.cacheManager = cacheManager;
+  }
+
+  @Override
+  public void run() {
+    LOG.debug("Persisting metric metadata...");
+    persistMetricMetadata();
+    LOG.debug("Persisting hosted apps metadata...");
+    persistHostAppsMetadata();
+    LOG.debug("Persisting hosted instance metadata...");
+    persistHostInstancesMetadata();
+    if (cacheManager.isDistributedModeEnabled()) {
+      LOG.debug("Refreshing metric metadata...");
+      refreshMetricMetadata();
+      LOG.debug("Refreshing hosted apps metadata...");
+      refreshHostAppsMetadata();
+      LOG.debug("Refreshing hosted instances metadata...");
+      refreshHostedInstancesMetadata();
+    }
+  }
+
+  /**
+   * Find metrics not persisted to store and persist them
+   */
+  private void persistMetricMetadata() {
+    List<TimelineMetricMetadata> metadataToPersist = new ArrayList<>();
+    // Find all entries to persist
+    for (TimelineMetricMetadata metadata : 
cacheManager.getMetadataCache().values()) {
+      if (!metadata.isPersisted()) {
+        metadataToPersist.add(metadata);
+      }
+    }
+    boolean markSuccess = false;
+    if (!metadataToPersist.isEmpty()) {
+      try {
+        cacheManager.persistMetadata(metadataToPersist);
+        markSuccess = true;
+      } catch (SQLException e) {
+        LOG.warn("Error persisting metadata.", e);
+      }
+    }
+    // Mark corresponding entries as persisted to skip on next run
+    if (markSuccess) {
+      for (TimelineMetricMetadata metadata : metadataToPersist) {
+        TimelineMetricMetadataKey key = new TimelineMetricMetadataKey(
+          metadata.getMetricName(), metadata.getAppId(), 
metadata.getInstanceId()
+        );
+
+        // Mark entry as being persisted
+        metadata.setIsPersisted(true);
+        // Update cache
+        cacheManager.getMetadataCache().put(key, metadata);
+      }
+    }
+  }
+
+  /**
+   * Read all metric metadata and update cached values - HA mode
+   */
+  private void refreshMetricMetadata() {
+    Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadataFromStore = 
null;
+    try {
+      metadataFromStore = cacheManager.getMetadataFromStore();
+    } catch (SQLException e) {
+      LOG.warn("Error refreshing metadata from store.", e);
+    }
+    if (metadataFromStore != null) {
+      Map<TimelineMetricMetadataKey, TimelineMetricMetadata> cachedMetadata =
+        cacheManager.getMetadataCache();
+
+      for (Map.Entry<TimelineMetricMetadataKey, TimelineMetricMetadata> 
metadataEntry : metadataFromStore.entrySet()) {
+        if (!cachedMetadata.containsKey(metadataEntry.getKey())) {
+          cachedMetadata.put(metadataEntry.getKey(), metadataEntry.getValue());
+        }
+      }
+    }
+  }
+
+  /**
+   * Sync hosted apps data if needed
+   */
+  private void persistHostAppsMetadata() {
+    if (cacheManager.syncHostedAppsMetadata()) {
+      Map<String, TimelineMetricHostMetadata> persistedData = null;
+      try {
+        persistedData = cacheManager.getHostedAppsFromStore();
+      } catch (SQLException e) {
+        LOG.warn("Failed on fetching hosted apps data from store.", e);
+        return; // Something wrong with store
+      }
+
+      Map<String, TimelineMetricHostMetadata> cachedData = 
cacheManager.getHostedAppsCache();
+      Map<String, TimelineMetricHostMetadata> dataToSync = new HashMap<>();
+      if (cachedData != null && !cachedData.isEmpty()) {
+        for (Map.Entry<String, TimelineMetricHostMetadata> cacheEntry : 
cachedData.entrySet()) {
+          // No persistence / stale data in store
+          if (persistedData == null || persistedData.isEmpty() ||
+            !persistedData.containsKey(cacheEntry.getKey()) ||
+            
!persistedData.get(cacheEntry.getKey()).getHostedApps().keySet().containsAll(cacheEntry.getValue().getHostedApps().keySet()))
 {
+            dataToSync.put(cacheEntry.getKey(), cacheEntry.getValue());
+          }
+        }
+        try {
+          cacheManager.persistHostedAppsMetadata(dataToSync);
+          cacheManager.markSuccessOnSyncHostedAppsMetadata();
+
+        } catch (SQLException e) {
+          LOG.warn("Error persisting hosted apps metadata.", e);
+        }
+      }
+
+    }
+  }
+
+  /**
+   * Sync apps instances data if needed
+   */
+  private void persistHostInstancesMetadata() {
+    if (cacheManager.syncHostedInstanceMetadata()) {
+      Map<String, Set<String>> persistedData = null;
+      try {
+        persistedData = cacheManager.getHostedInstancesFromStore();
+      } catch (SQLException e) {
+        LOG.warn("Failed on fetching hosted instances data from store.", e);
+        return; // Something wrong with store
+      }
+
+      Map<String, Set<String>> cachedData = 
cacheManager.getHostedInstanceCache();
+      Map<String, Set<String>> dataToSync = new HashMap<>();
+      if (cachedData != null && !cachedData.isEmpty()) {
+        for (Map.Entry<String, Set<String>> cacheEntry : 
cachedData.entrySet()) {
+          // No persistence / stale data in store
+          if (persistedData == null || persistedData.isEmpty() ||
+            !persistedData.containsKey(cacheEntry.getKey()) ||
+            
!persistedData.get(cacheEntry.getKey()).containsAll(cacheEntry.getValue())) {
+            dataToSync.put(cacheEntry.getKey(), cacheEntry.getValue());
+          }
+        }
+        try {
+          cacheManager.persistHostedInstanceMetadata(dataToSync);
+          cacheManager.markSuccessOnSyncHostedInstanceMetadata();
+
+        } catch (SQLException e) {
+          LOG.warn("Error persisting hosted apps metadata.", e);
+        }
+      }
+
+    }
+  }
+  /**
+   * Read all hosted apps metadata and update cached values - HA
+   */
+  private void refreshHostAppsMetadata() {
+    Map<String, TimelineMetricHostMetadata> hostedAppsDataFromStore = null;
+    try {
+      hostedAppsDataFromStore = cacheManager.getHostedAppsFromStore();
+    } catch (SQLException e) {
+      LOG.warn("Error refreshing metadata from store.", e);
+    }
+    if (hostedAppsDataFromStore != null) {
+      Map<String, TimelineMetricHostMetadata> cachedData = 
cacheManager.getHostedAppsCache();
+
+      for (Map.Entry<String, TimelineMetricHostMetadata> storeEntry : 
hostedAppsDataFromStore.entrySet()) {
+        if (!cachedData.containsKey(storeEntry.getKey())) {
+          cachedData.put(storeEntry.getKey(), storeEntry.getValue());
+        }
+      }
+    }
+  }
+
+  private void refreshHostedInstancesMetadata() {
+    Map<String, Set<String>> hostedInstancesFromStore = null;
+    try {
+      hostedInstancesFromStore = cacheManager.getHostedInstancesFromStore();
+    } catch (SQLException e) {
+      LOG.warn("Error refreshing metadata from store.", e);
+    }
+    if (hostedInstancesFromStore != null) {
+      Map<String, Set<String>> cachedData = 
cacheManager.getHostedInstanceCache();
+
+      for (Map.Entry<String, Set<String>> storeEntry : 
hostedInstancesFromStore.entrySet()) {
+        if (!cachedData.containsKey(storeEntry.getKey())) {
+          cachedData.put(storeEntry.getKey(), storeEntry.getValue());
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/AbstractTimelineMetricsSeriesAggregateFunction.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/AbstractTimelineMetricsSeriesAggregateFunction.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/AbstractTimelineMetricsSeriesAggregateFunction.java
new file mode 100644
index 0000000..1070242
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/AbstractTimelineMetricsSeriesAggregateFunction.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.function;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import com.google.common.base.Joiner;
+
+public abstract class AbstractTimelineMetricsSeriesAggregateFunction
+    implements TimelineMetricsSeriesAggregateFunction {
+
+  @Override
+  public TimelineMetric apply(TimelineMetrics timelineMetrics) {
+    Set<String> metricNameSet = new TreeSet<>();
+    Set<String> hostNameSet = new TreeSet<>();
+    Set<String> appIdSet = new TreeSet<>();
+    Set<String> instanceIdSet = new TreeSet<>();
+    TreeMap<Long, List<Double>> metricValues = new TreeMap<>();
+
+    for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) {
+      metricNameSet.add(timelineMetric.getMetricName());
+      addToSetOnlyNotNull(hostNameSet, timelineMetric.getHostName());
+      addToSetOnlyNotNull(appIdSet, timelineMetric.getAppId());
+      addToSetOnlyNotNull(instanceIdSet, timelineMetric.getInstanceId());
+
+      for (Map.Entry<Long, Double> metricValue : 
timelineMetric.getMetricValues().entrySet()) {
+        Long timestamp = metricValue.getKey();
+        Double value = metricValue.getValue();
+        if (!metricValues.containsKey(timestamp)) {
+          metricValues.put(timestamp, new LinkedList<Double>());
+        }
+        metricValues.get(timestamp).add(value);
+      }
+    }
+
+    TreeMap<Long, Double> aggregatedMetricValues = new TreeMap<>();
+    for (Map.Entry<Long, List<Double>> metricValue : metricValues.entrySet()) {
+      List<Double> values = metricValue.getValue();
+      if (values.size() == 0) {
+        throw new IllegalArgumentException("count of values should be more 
than 0");
+      }
+      aggregatedMetricValues.put(metricValue.getKey(), applyFunction(values));
+    }
+
+    TimelineMetric timelineMetric = new TimelineMetric();
+    timelineMetric.setMetricName(getMetricName(metricNameSet.iterator()));
+    timelineMetric.setHostName(joinStringsWithComma(hostNameSet.iterator()));
+    timelineMetric.setAppId(joinStringsWithComma(appIdSet.iterator()));
+    
timelineMetric.setInstanceId(joinStringsWithComma(instanceIdSet.iterator()));
+    if (aggregatedMetricValues.size() > 0) {
+      timelineMetric.setStartTime(aggregatedMetricValues.firstKey());
+    }
+    timelineMetric.setMetricValues(aggregatedMetricValues);
+    return timelineMetric;
+  }
+
+  protected String getMetricName(Iterator<String> metricNames) {
+    return getFunctionName() + "(" + Joiner.on(",").join(metricNames) + ")";
+  }
+
+  protected String joinStringsWithComma(Iterator<String> hostNames) {
+    return Joiner.on(",").join(hostNames);
+  }
+
+  protected abstract Double applyFunction(List<Double> values);
+  protected abstract String getFunctionName();
+
+  private void addToSetOnlyNotNull(Set<String> set, String value) {
+    if (value != null) {
+      set.add(value);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/SeriesAggregateFunction.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/SeriesAggregateFunction.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/SeriesAggregateFunction.java
new file mode 100644
index 0000000..1438194
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/SeriesAggregateFunction.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.function;
+
+import org.apache.ambari.metrics.core.timeline.aggregators.Function;
+
+public enum SeriesAggregateFunction {
+  AVG, MIN, MAX, SUM;
+
+  public static boolean isPresent(String functionName) {
+    try {
+      SeriesAggregateFunction.valueOf(functionName.toUpperCase());
+    } catch (IllegalArgumentException e) {
+      return false;
+    }
+    return true;
+  }
+
+  public static SeriesAggregateFunction getFunction(String functionName) 
throws Function.FunctionFormatException {
+    try {
+      return SeriesAggregateFunction.valueOf(functionName.toUpperCase());
+    } catch (NullPointerException | IllegalArgumentException e) {
+      throw new Function.FunctionFormatException(
+            "Function should be sum, avg, min, max. Got " + functionName, e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesAggregateFunction.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesAggregateFunction.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesAggregateFunction.java
new file mode 100644
index 0000000..a5f40e1
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesAggregateFunction.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.function;
+
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+    
+public interface TimelineMetricsSeriesAggregateFunction {
+  TimelineMetric apply(TimelineMetrics timelineMetrics);
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesAggregateFunctionFactory.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesAggregateFunctionFactory.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesAggregateFunctionFactory.java
new file mode 100644
index 0000000..f07aeb9
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesAggregateFunctionFactory.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.function;
+
+import org.apache.ambari.metrics.core.timeline.aggregators.Function;
+
+public class TimelineMetricsSeriesAggregateFunctionFactory {
+  private TimelineMetricsSeriesAggregateFunctionFactory() {
+  }
+
+  public static TimelineMetricsSeriesAggregateFunction 
newInstance(SeriesAggregateFunction func) {
+    switch (func) {
+    case AVG:
+      return new TimelineMetricsSeriesAvgAggregateFunction();
+    case MIN:
+      return new TimelineMetricsSeriesMinAggregateFunction();
+    case MAX:
+      return new TimelineMetricsSeriesMaxAggregateFunction();
+    case SUM:
+      return new TimelineMetricsSeriesSumAggregateFunction();
+    default:
+      throw new Function.FunctionFormatException("Function should be sum, avg, 
min, max. Got " +
+          func.name());
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesAvgAggregateFunction.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesAvgAggregateFunction.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesAvgAggregateFunction.java
new file mode 100644
index 0000000..b0e3069
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesAvgAggregateFunction.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.function;
+
+import java.util.List;
+
+public class TimelineMetricsSeriesAvgAggregateFunction extends 
AbstractTimelineMetricsSeriesAggregateFunction {
+  private static final String FUNCTION_NAME = "AVG";
+
+  @Override
+  protected Double applyFunction(List<Double> values) {
+    double sum = 0.0d;
+    for (Double value : values) {
+      sum += value;
+    }
+
+    return sum / values.size();
+  }
+
+  @Override
+  protected String getFunctionName() {
+    return FUNCTION_NAME;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesMaxAggregateFunction.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesMaxAggregateFunction.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesMaxAggregateFunction.java
new file mode 100644
index 0000000..58c6d4c
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesMaxAggregateFunction.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.function;
+
+import java.util.List;
+
+public class TimelineMetricsSeriesMaxAggregateFunction extends 
AbstractTimelineMetricsSeriesAggregateFunction {
+  private static final String FUNCTION_NAME = "MAX";
+
+  @Override
+  protected Double applyFunction(List<Double> values) {
+    double max = Double.MIN_VALUE;
+    for (Double value : values) {
+      if (value > max) {
+        max = value;
+      }
+    }
+
+    return max;
+  }
+
+  @Override
+  protected String getFunctionName() {
+    return FUNCTION_NAME;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesMinAggregateFunction.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesMinAggregateFunction.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesMinAggregateFunction.java
new file mode 100644
index 0000000..60e1bb5
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesMinAggregateFunction.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.function;
+
+import java.util.List;
+
+public class TimelineMetricsSeriesMinAggregateFunction extends 
AbstractTimelineMetricsSeriesAggregateFunction {
+  private static final String FUNCTION_NAME = "MIN";
+
+  @Override
+  protected Double applyFunction(List<Double> values) {
+    double min = Double.MAX_VALUE;
+    for (Double value : values) {
+      if (value < min) {
+        min = value;
+      }
+    }
+
+    return min;
+  }
+
+  @Override
+  protected String getFunctionName() {
+    return FUNCTION_NAME;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesSumAggregateFunction.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesSumAggregateFunction.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesSumAggregateFunction.java
new file mode 100644
index 0000000..997d7be
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesSumAggregateFunction.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.function;
+
+import java.util.List;
+
+public class TimelineMetricsSeriesSumAggregateFunction extends 
AbstractTimelineMetricsSeriesAggregateFunction {
+  private static final String FUNCTION_NAME = "SUM";
+
+  @Override
+  protected Double applyFunction(List<Double> values) {
+    double sum = 0.0d;
+    for (Double value : values) {
+      sum += value;
+    }
+
+    return sum;
+  }
+
+  @Override
+  protected String getFunctionName() {
+    return FUNCTION_NAME;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/Condition.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/Condition.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/Condition.java
new file mode 100644
index 0000000..fa118a3
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/Condition.java
@@ -0,0 +1,51 @@
+package org.apache.ambari.metrics.core.timeline.query;
+
+import java.util.List;
+
+import org.apache.hadoop.metrics2.sink.timeline.Precision;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface Condition {
+  boolean isEmpty();
+
+  List<byte[]> getUuids();
+  List<String> getMetricNames();
+  boolean isPointInTime();
+  boolean isGrouped();
+  void setStatement(String statement);
+  List<String> getHostnames();
+  Precision getPrecision();
+  void setPrecision(Precision precision);
+  String getAppId();
+  String getInstanceId();
+  StringBuilder getConditionClause();
+  String getOrderByClause(boolean asc);
+  String getStatement();
+  Long getStartTime();
+  Long getEndTime();
+  Integer getLimit();
+  Integer getFetchSize();
+  void setFetchSize(Integer fetchSize);
+  void addOrderByColumn(String column);
+  void setNoLimit();
+  boolean doUpdate();
+  void setMetricNamesNotCondition(boolean metricNamesNotCondition);
+  void setHostnamesNotCondition(boolean hostNamesNotCondition);
+  void setUuidNotCondition(boolean uuidNotCondition);
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/ConditionBuilder.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/ConditionBuilder.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/ConditionBuilder.java
new file mode 100644
index 0000000..e779d77
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/ConditionBuilder.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.query;
+
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.metrics2.sink.timeline.Precision;
+import org.apache.ambari.metrics.core.timeline.aggregators.Function;
+
+public class ConditionBuilder {
+
+  private List<String> metricNames;
+  private List<String> hostnames;
+  private String appId;
+  private String instanceId;
+  private Long startTime;
+  private Long endTime;
+  private Precision precision;
+  private Integer limit;
+  private boolean grouped;
+  private boolean noLimit = false;
+  private Integer fetchSize;
+  private String statement;
+  private Set<String> orderByColumns = new LinkedHashSet<String>();
+  private Integer topN;
+  private boolean isBottomN;
+  private Function topNFunction;
+  private List<byte[]> uuids;
+
+  public ConditionBuilder(List<String> metricNames) {
+    this.metricNames = metricNames;
+  }
+
+  public ConditionBuilder hostnames(List<String> hostnames) {
+    this.hostnames = hostnames;
+    return this;
+  }
+
+  public ConditionBuilder appId(String appId) {
+    this.appId = appId;
+    return this;
+  }
+
+  public ConditionBuilder instanceId(String instanceId) {
+    this.instanceId = instanceId;
+    return this;
+  }
+
+  public ConditionBuilder startTime(Long startTime) {
+    this.startTime = startTime;
+    return this;
+  }
+
+  public ConditionBuilder endTime(Long endTime) {
+    this.endTime = endTime;
+    return this;
+  }
+
+  public ConditionBuilder precision(Precision precision) {
+    this.precision = precision;
+    return this;
+  }
+
+  public ConditionBuilder limit(Integer limit) {
+    this.limit = limit;
+    return this;
+  }
+
+  public ConditionBuilder grouped(boolean grouped) {
+    this.grouped = grouped;
+    return this;
+  }
+
+  public ConditionBuilder noLimit(boolean noLimit) {
+    this.noLimit = noLimit;
+    return this;
+  }
+
+  public ConditionBuilder fetchSize(Integer fetchSize) {
+    this.fetchSize = fetchSize;
+    return this;
+  }
+
+  public ConditionBuilder statement(String statement) {
+    this.statement = statement;
+    return this;
+  }
+
+  public ConditionBuilder orderByColumns(Set<String> orderByColumns) {
+    this.orderByColumns = orderByColumns;
+    return this;
+  }
+
+  public ConditionBuilder topN(Integer topN) {
+    this.topN = topN;
+    return this;
+  }
+
+  public ConditionBuilder isBottomN(boolean isBottomN) {
+    this.isBottomN = isBottomN;
+    return this;
+  }
+
+  public ConditionBuilder topNFunction(Function topNFunction) {
+    this.topNFunction = topNFunction;
+    return this;
+  }
+
+  public ConditionBuilder uuid(List<byte[]> uuids) {
+    this.uuids = uuids;
+    return this;
+  }
+
+  public Condition build() {
+    if (topN == null) {
+      return new DefaultCondition(
+        uuids, metricNames,
+        hostnames, appId, instanceId, startTime, endTime,
+        precision, limit, grouped);
+    } else {
+      return new TopNCondition(uuids, metricNames, hostnames, appId, 
instanceId,
+        startTime, endTime, precision, limit, grouped, topN, topNFunction, 
isBottomN);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/ConnectionProvider.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/ConnectionProvider.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/ConnectionProvider.java
new file mode 100644
index 0000000..830a0eb
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/ConnectionProvider.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.query;
+
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+/**
+ *
+ */
+public interface ConnectionProvider {
+  public Connection getConnection() throws SQLException;
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/DefaultCondition.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/DefaultCondition.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/DefaultCondition.java
new file mode 100644
index 0000000..888f381
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/DefaultCondition.java
@@ -0,0 +1,421 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.query;
+
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.Precision;
+import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
+import 
org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
+
+public class DefaultCondition implements Condition {
+  List<String> metricNames;
+  List<String> hostnames;
+  String appId;
+  String instanceId;
+  Long startTime;
+  Long endTime;
+  Precision precision;
+  Integer limit;
+  boolean grouped;
+  boolean noLimit = false;
+  Integer fetchSize;
+  String statement;
+  Set<String> orderByColumns = new LinkedHashSet<String>();
+  boolean metricNamesNotCondition = false;
+  boolean hostNamesNotCondition = false;
+  boolean uuidNotCondition = false;
+  List<byte[]> uuids = new ArrayList<>();
+
+  private static final Log LOG = LogFactory.getLog(DefaultCondition.class);
+
+  public DefaultCondition(List<String> metricNames, List<String> hostnames, 
String appId,
+                          String instanceId, Long startTime, Long endTime, 
Precision precision,
+                          Integer limit, boolean grouped) {
+    this.metricNames = metricNames;
+    this.hostnames = hostnames;
+    this.appId = appId;
+    this.instanceId = instanceId;
+    this.startTime = startTime;
+    this.endTime = endTime;
+    this.precision = precision;
+    this.limit = limit;
+    this.grouped = grouped;
+  }
+
+  public DefaultCondition(List<byte[]> uuids, List<String> metricNames, 
List<String> hostnames, String appId,
+                          String instanceId, Long startTime, Long endTime, 
Precision precision,
+                          Integer limit, boolean grouped) {
+    this.uuids = uuids;
+    this.metricNames = metricNames;
+    this.hostnames = hostnames;
+    this.appId = appId;
+    this.instanceId = instanceId;
+    this.startTime = startTime;
+    this.endTime = endTime;
+    this.precision = precision;
+    this.limit = limit;
+    this.grouped = grouped;
+  }
+
+  public String getStatement() {
+    return statement;
+  }
+
+  public void setStatement(String statement) {
+    this.statement = statement;
+  }
+
+  public List<String> getMetricNames() {
+    return metricNames == null || metricNames.isEmpty() ? null : metricNames;
+  }
+
+  public StringBuilder getConditionClause() {
+    StringBuilder sb = new StringBuilder();
+    boolean appendConjunction = appendUuidClause(sb);
+    appendConjunction = append(sb, appendConjunction, getStartTime(), " 
SERVER_TIME >= ?");
+    append(sb, appendConjunction, getEndTime(), " SERVER_TIME < ?");
+
+    return sb;
+  }
+
+  protected static boolean append(StringBuilder sb,
+                                  boolean appendConjunction,
+                                  Object value, String str) {
+    if (value != null) {
+      if (appendConjunction) {
+        sb.append(" AND");
+      }
+
+      sb.append(str);
+      appendConjunction = true;
+    }
+    return appendConjunction;
+  }
+
+  public List<String> getHostnames() {
+    return hostnames;
+  }
+
+  public Precision getPrecision() {
+    return precision;
+  }
+
+  public void setPrecision(Precision precision) {
+    this.precision = precision;
+  }
+
+  public String getAppId() {
+    if (appId != null && !appId.isEmpty()) {
+      if (!(appId.equals("HOST") || appId.equals("FLUME_HANDLER"))) {
+        return appId.toLowerCase();
+      } else {
+        return appId;
+      }
+    }
+    return null;
+  }
+
+  public String getInstanceId() {
+    return instanceId == null || instanceId.isEmpty() ? null : instanceId;
+  }
+
+  /**
+   * Convert to millis.
+   */
+  public Long getStartTime() {
+    if (startTime == null) {
+      return null;
+    } else if (startTime < 9999999999l) {
+      return startTime * 1000;
+    } else {
+      return startTime;
+    }
+  }
+
+  public Long getEndTime() {
+    if (endTime == null) {
+      return null;
+    }
+    if (endTime < 9999999999l) {
+      return endTime * 1000;
+    } else {
+      return endTime;
+    }
+  }
+
+  public void setNoLimit() {
+    this.noLimit = true;
+  }
+
+  @Override
+  public boolean doUpdate() {
+    return false;
+  }
+
+  public Integer getLimit() {
+    if (noLimit) {
+      return null;
+    }
+    return limit == null ? PhoenixHBaseAccessor.RESULTSET_LIMIT : limit;
+  }
+
+  public boolean isGrouped() {
+    return grouped;
+  }
+
+  public boolean isPointInTime() {
+    return getStartTime() == null && getEndTime() == null;
+  }
+
+  public boolean isEmpty() {
+    return (metricNames == null || metricNames.isEmpty())
+      && (hostnames == null || hostnames.isEmpty())
+      && (appId == null || appId.isEmpty())
+      && (instanceId == null || instanceId.isEmpty())
+      && startTime == null
+      && endTime == null;
+  }
+
+  public Integer getFetchSize() {
+    return fetchSize;
+  }
+
+  public void setFetchSize(Integer fetchSize) {
+    this.fetchSize = fetchSize;
+  }
+
+  public void addOrderByColumn(String column) {
+    orderByColumns.add(column);
+  }
+
+  public String getOrderByClause(boolean asc) {
+    String orderByStr = " ORDER BY ";
+    if (!orderByColumns.isEmpty()) {
+      StringBuilder sb = new StringBuilder(orderByStr);
+      for (String orderByColumn : orderByColumns) {
+        if (sb.length() != orderByStr.length()) {
+          sb.append(", ");
+        }
+        sb.append(orderByColumn);
+        if (!asc) {
+          sb.append(" DESC");
+        }
+      }
+      sb.append(" ");
+      return sb.toString();
+    }
+    return null;
+  }
+
+  protected boolean appendUuidClause(StringBuilder sb) {
+    boolean appendConjunction = false;
+
+    if (CollectionUtils.isNotEmpty(uuids)) {
+
+      List<byte[]> uuidsHost = new ArrayList<>();
+      List<byte[]> uuidsMetric = new ArrayList<>();
+      List<byte[]> uuidsFull = new ArrayList<>();
+
+      if (getUuids() != null) {
+        for (byte[] uuid : uuids) {
+          if (uuid.length == 
TimelineMetricMetadataManager.TIMELINE_METRIC_UUID_LENGTH) {
+            uuidsMetric.add(uuid);
+          } else if (uuid.length == 
TimelineMetricMetadataManager.HOSTNAME_UUID_LENGTH) {
+            uuidsHost.add(uuid);
+          } else {
+            uuidsFull.add(uuid);
+          }
+        }
+
+        // Put a '(' first
+        sb.append("(");
+
+        //IN clause
+        // METRIC_NAME (NOT) IN (?,?,?,?)
+        if (CollectionUtils.isNotEmpty(uuidsFull)) {
+          sb.append("UUID");
+          if (uuidNotCondition) {
+            sb.append(" NOT");
+          }
+          sb.append(" IN (");
+          //Append ?,?,?,?
+          for (int i = 0; i < uuidsFull.size(); i++) {
+            sb.append("?");
+            if (i < uuidsFull.size() - 1) {
+              sb.append(", ");
+            }
+          }
+          sb.append(")");
+          appendConjunction = true;
+        }
+
+        //Put an AND if both types are present
+        if (CollectionUtils.isNotEmpty(uuidsFull) &&
+          CollectionUtils.isNotEmpty(uuidsMetric)) {
+          sb.append(" AND ");
+        }
+
+        // ( for OR
+        if (!metricNamesNotCondition && uuidsMetric.size() > 1 && 
(CollectionUtils.isNotEmpty(uuidsFull) || 
CollectionUtils.isNotEmpty(uuidsHost))) {
+          sb.append("(");
+        }
+
+        //LIKE clause for clusterMetric UUIDs
+        // UUID (NOT) LIKE ? OR(AND) UUID LIKE ?
+        if (CollectionUtils.isNotEmpty(uuidsMetric)) {
+
+          for (int i = 0; i < uuidsMetric.size(); i++) {
+            sb.append("UUID");
+            if (metricNamesNotCondition) {
+              sb.append(" NOT");
+            }
+            sb.append(" LIKE ");
+            sb.append("?");
+
+            if (i < uuidsMetric.size() - 1) {
+              if (metricNamesNotCondition) {
+                sb.append(" AND ");
+              } else {
+                sb.append(" OR ");
+              }
+            // ) for OR
+            } else if ((CollectionUtils.isNotEmpty(uuidsFull) || 
CollectionUtils.isNotEmpty(uuidsHost)) && !metricNamesNotCondition && 
uuidsMetric.size() > 1) {
+              sb.append(")");
+            }
+          }
+          appendConjunction = true;
+        }
+
+        //Put an AND if both types are present
+        if ((CollectionUtils.isNotEmpty(uuidsMetric) || 
(CollectionUtils.isNotEmpty(uuidsFull) && CollectionUtils.isEmpty(uuidsMetric)))
+          && CollectionUtils.isNotEmpty(uuidsHost)) {
+          sb.append(" AND ");
+        }
+        // ( for OR
+        if((CollectionUtils.isNotEmpty(uuidsFull) || 
CollectionUtils.isNotEmpty(uuidsMetric)) && !hostNamesNotCondition && 
uuidsHost.size() > 1){
+          sb.append("(");
+        }
+
+        //LIKE clause for HOST UUIDs
+        // UUID (NOT) LIKE ? OR(AND) UUID LIKE ?
+        if (CollectionUtils.isNotEmpty(uuidsHost)) {
+
+          for (int i = 0; i < uuidsHost.size(); i++) {
+            sb.append("UUID");
+            if (hostNamesNotCondition) {
+              sb.append(" NOT");
+            }
+            sb.append(" LIKE ");
+            sb.append("?");
+
+            if (i < uuidsHost.size() - 1) {
+              if (hostNamesNotCondition) {
+                sb.append(" AND ");
+              } else {
+                sb.append(" OR ");
+              }
+            // ) for OR
+            } else if ((CollectionUtils.isNotEmpty(uuidsFull) || 
CollectionUtils.isNotEmpty(uuidsMetric)) && !hostNamesNotCondition && 
uuidsHost.size() > 1) {
+              sb.append(")");
+            }
+          }
+          appendConjunction = true;
+        }
+
+        // Finish with a ')'
+        if (appendConjunction) {
+          sb.append(")");
+        }
+
+        uuids = new ArrayList<>();
+        if (CollectionUtils.isNotEmpty(uuidsFull)) {
+          uuids.addAll(uuidsFull);
+        }
+        for (byte[] uuid: uuidsMetric) {
+          uuids.add(new String(uuid).concat("%").getBytes());
+        }
+        for (byte[] uuid: uuidsHost) {
+          uuids.add("%".concat(new String(uuid)).getBytes());
+        }
+      }
+    }
+
+    return appendConjunction;
+  }
+
+  @Override
+  public String toString() {
+    return "Condition{" +
+      "uuids=" + uuids +
+      ", appId='" + appId + '\'' +
+      ", instanceId='" + instanceId + '\'' +
+      ", startTime=" + startTime +
+      ", endTime=" + endTime +
+      ", limit=" + limit +
+      ", grouped=" + grouped +
+      ", orderBy=" + orderByColumns +
+      ", noLimit=" + noLimit +
+      '}';
+  }
+
+  protected static boolean metricNamesHaveWildcard(List<String> metricNames) {
+    for (String name : metricNames) {
+      if (name.contains("%")) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  protected static boolean hostNamesHaveWildcard(List<String> hostnames) {
+    if (hostnames == null)
+      return false;
+    for (String name : hostnames) {
+      if (name.contains("%")) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public void setMetricNamesNotCondition(boolean metricNamesNotCondition) {
+    this.metricNamesNotCondition = metricNamesNotCondition;
+  }
+
+  @Override
+  public void setHostnamesNotCondition(boolean hostNamesNotCondition) {
+    this.hostNamesNotCondition = hostNamesNotCondition;
+  }
+
+  @Override
+  public void setUuidNotCondition(boolean uuidNotCondition) {
+    this.uuidNotCondition = uuidNotCondition;
+  }
+
+  @Override
+  public List<byte[]> getUuids() {
+    return uuids;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/DefaultPhoenixDataSource.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/DefaultPhoenixDataSource.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/DefaultPhoenixDataSource.java
new file mode 100644
index 0000000..5c0a4b5
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/DefaultPhoenixDataSource.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.query;
+
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+
+public class DefaultPhoenixDataSource implements PhoenixConnectionProvider {
+
+  static final Log LOG = LogFactory.getLog(DefaultPhoenixDataSource.class);
+  private static final String ZOOKEEPER_CLIENT_PORT = 
"hbase.zookeeper.property.clientPort";
+  private static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
+  private static final String ZNODE_PARENT = "zookeeper.znode.parent";
+
+  private static final String connectionUrl = "jdbc:phoenix:%s:%s:%s";
+  private final String url;
+
+  private Configuration hbaseConf;
+
+  public DefaultPhoenixDataSource(Configuration hbaseConf) {
+    this.hbaseConf = hbaseConf;
+    String zookeeperClientPort = hbaseConf.getTrimmed(ZOOKEEPER_CLIENT_PORT, 
"2181");
+    String zookeeperQuorum = hbaseConf.getTrimmed(ZOOKEEPER_QUORUM);
+    String znodeParent = hbaseConf.getTrimmed(ZNODE_PARENT, 
"/ams-hbase-unsecure");
+    if (zookeeperQuorum == null || zookeeperQuorum.isEmpty()) {
+      throw new IllegalStateException("Unable to find Zookeeper quorum to " +
+        "access HBase store using Phoenix.");
+    }
+
+    url = String.format(connectionUrl,
+      zookeeperQuorum,
+      zookeeperClientPort,
+      znodeParent);
+  }
+
+  /**
+   * Get HBaseAdmin for table ops.
+   * @return @HBaseAdmin
+   * @throws IOException
+   */
+  public HBaseAdmin getHBaseAdmin() throws IOException {
+    return (HBaseAdmin) 
ConnectionFactory.createConnection(hbaseConf).getAdmin();
+  }
+
+  /**
+   * Get JDBC connection to HBase store. Assumption is that the hbase
+   * configuration is present on the classpath and loaded by the caller into
+   * the Configuration object.
+   * Phoenix already caches the HConnection between the client and HBase
+   * cluster.
+   *
+   * @return @java.sql.Connection
+   */
+  public Connection getConnection() throws SQLException {
+
+    LOG.debug("Metric store connection url: " + url);
+    try {
+      return DriverManager.getConnection(url);
+    } catch (SQLException e) {
+      LOG.warn("Unable to connect to HBase store using Phoenix.", e);
+
+      throw e;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/EmptyCondition.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/EmptyCondition.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/EmptyCondition.java
new file mode 100644
index 0000000..742b09b
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/EmptyCondition.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.query;
+
+import java.util.List;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.metrics2.sink.timeline.Precision;
+
+/**
+ * Encapsulate a Condition with pre-formatted and pre-parsed query string.
+ */
+public class EmptyCondition implements Condition {
+  String statement;
+  boolean doUpdate = false;
+  boolean metricNamesNotCondition = false;
+
+  @Override
+  public boolean isEmpty() {
+    return false;
+  }
+
+  @Override
+  public List<byte[]> getUuids() {
+    return null;
+  }
+
+  @Override
+  public List<String> getMetricNames() {
+    return null;
+  }
+
+  @Override
+  public boolean isPointInTime() {
+    return false;
+  }
+
+  @Override
+  public boolean isGrouped() {
+    return true;
+  }
+
+  @Override
+  public void setStatement(String statement) {
+    this.statement = statement;
+  }
+
+  @Override
+  public List<String> getHostnames() {
+    return null;
+  }
+
+  @Override
+  public Precision getPrecision() {
+    return null;
+  }
+
+  @Override
+  public void setPrecision(Precision precision) {
+
+  }
+
+  @Override
+  public String getAppId() {
+    return null;
+  }
+
+  @Override
+  public String getInstanceId() {
+    return null;
+  }
+
+  @Override
+  public StringBuilder getConditionClause() {
+    return null;
+  }
+
+  @Override
+  public String getOrderByClause(boolean asc) {
+    return null;
+  }
+
+  @Override
+  public String getStatement() {
+    return statement;
+  }
+
+  @Override
+  public Long getStartTime() {
+    return null;
+  }
+
+  @Override
+  public Long getEndTime() {
+    return null;
+  }
+
+  @Override
+  public Integer getLimit() {
+    return null;
+  }
+
+  @Override
+  public Integer getFetchSize() {
+    return null;
+  }
+
+  @Override
+  public void setFetchSize(Integer fetchSize) {
+
+  }
+
+  @Override
+  public void addOrderByColumn(String column) {
+
+  }
+
+  @Override
+  public void setNoLimit() {
+
+  }
+
+  public void setDoUpdate(boolean doUpdate) {
+    this.doUpdate = doUpdate;
+  }
+
+  @Override
+  public boolean doUpdate() {
+    return doUpdate;
+  }
+
+  @Override
+  public String toString() {
+    return "EmptyCondition{ " +
+      " statement = " + this.getStatement() +
+      " doUpdate = " + this.doUpdate() +
+      " }";
+  }
+
+  @Override
+  public void setMetricNamesNotCondition(boolean metricNamesNotCondition) {
+    this.metricNamesNotCondition = metricNamesNotCondition;
+  }
+
+  @Override
+  public void setHostnamesNotCondition(boolean hostNamesNotCondition) {
+    throw new NotImplementedException("Not implemented");
+  }
+
+  @Override
+  public void setUuidNotCondition(boolean uuidNotCondition) {
+    throw new NotImplementedException("Not implemented");
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixConnectionProvider.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixConnectionProvider.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixConnectionProvider.java
new file mode 100644
index 0000000..c8f958e
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixConnectionProvider.java
@@ -0,0 +1,31 @@
+package org.apache.ambari.metrics.core.timeline.query;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.Admin;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface PhoenixConnectionProvider extends ConnectionProvider {
+  /**
+   * Get HBaseAdmin for the Phoenix connection
+   * @return
+   * @throws IOException
+   */
+  Admin getHBaseAdmin() throws IOException;
+}

Reply via email to