AMBARI-20777 : AMS changes to use instanceId for cluster based segregation of data. (avijayan)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/6326589b Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/6326589b Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/6326589b Branch: refs/heads/branch-3.0-perf Commit: 6326589b5af88fc72eddbfc6c33e1898e986b938 Parents: 893f97e Author: Aravindan Vijayan <[email protected]> Authored: Tue Apr 18 13:16:09 2017 -0700 Committer: Andrew Onishuk <[email protected]> Committed: Mon Apr 24 14:59:56 2017 +0300 ---------------------------------------------------------------------- .../timeline/HadoopTimelineMetricsSink.java | 4 + .../timeline/HBaseTimelineMetricStore.java | 5 + .../metrics/timeline/PhoenixHBaseAccessor.java | 106 +++++++++++++++++++ .../metrics/timeline/TimelineMetricStore.java | 8 ++ .../TimelineMetricClusterAggregatorSecond.java | 1 + .../TimelineMetricMetadataManager.java | 38 +++++++ .../discovery/TimelineMetricMetadataSync.java | 57 ++++++++++ .../timeline/query/PhoenixTransactSQL.java | 12 +++ .../webapp/TimelineWebServices.java | 16 +++ .../timeline/TestTimelineMetricStore.java | 5 + .../timeline/discovery/TestMetadataManager.java | 8 ++ .../timeline/discovery/TestMetadataSync.java | 12 +++ 12 files changed, 272 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/6326589b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java index a112ef2..8e0de03 100644 --- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java @@ -51,6 +51,7 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple private Map<String, Set<String>> useTagsMap = new HashMap<String, Set<String>>(); private TimelineMetricsCache metricsCache; private String hostName = "UNKNOWN.example.com"; + private String instanceId = null; private String serviceName = ""; private Collection<String> collectorHosts; private String collectorUri; @@ -94,6 +95,8 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple } serviceName = getServiceName(conf); + String inst = conf.getString("instanceId", ""); + instanceId = StringUtils.isEmpty(inst) ? null : inst; LOG.info("Identified hostname = " + hostName + ", serviceName = " + serviceName); // Initialize the collector write strategy @@ -318,6 +321,7 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple timelineMetric.setMetricName(name); timelineMetric.setHostName(hostName); timelineMetric.setAppId(serviceName); + timelineMetric.setInstanceId(instanceId); timelineMetric.setStartTime(startTime); timelineMetric.setType(metric.type() != null ? metric.type().name() : null); timelineMetric.getMetricValues().put(startTime, value.doubleValue()); http://git-wip-us.apache.org/repos/asf/ambari/blob/6326589b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java index 17c58f0..fa095a0 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java @@ -388,6 +388,11 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin } @Override + public Map<String, Set<String>> getInstanceHostsMetadata() throws SQLException, IOException { + return metricMetadataManager.getHostedInstanceCache(); + } + + @Override public List<String> getLiveInstances() { List<String> instances = null; http://git-wip-us.apache.org/repos/asf/ambari/blob/6326589b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java index 8b0d84b..65bbc4c 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java @@ -115,6 +115,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CONTAINER_METRICS_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_CONTAINER_METRICS_TABLE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_HOSTED_APPS_METADATA_TABLE_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_INSTANCE_HOST_TABLE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_TABLE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL; @@ -124,6 +125,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_ENCODING; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_HOSTED_APPS_METADATA_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_INSTANCE_HOST_METADATA_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_METADATA_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME; @@ -138,6 +140,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_TIME_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_HOSTED_APPS_METADATA_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_INSTANCE_HOST_METADATA_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METADATA_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CONTAINER_METRICS_SQL; @@ -430,6 +433,11 @@ public class PhoenixHBaseAccessor { encoding, compression); stmt.executeUpdate(hostedAppSql); + //Host Instances table + String hostedInstancesSql = String.format(CREATE_INSTANCE_HOST_TABLE_SQL, + encoding, compression); + stmt.executeUpdate(hostedInstancesSql); + // Container Metrics stmt.executeUpdate( String.format(CREATE_CONTAINER_METRICS_TABLE_SQL, encoding, tableTTL.get(CONTAINER_METRICS_TABLE_NAME), compression)); @@ -778,6 +786,8 @@ public class PhoenixHBaseAccessor { metadataManager.putIfModifiedHostedAppsMetadata( tm.getHostName(), tm.getAppId()); + + metadataManager.putIfModifiedHostedInstanceMetadata(tm.getInstanceId(), tm.getHostName()); } if (!acceptMetric) { iterator.remove(); @@ -1552,6 +1562,55 @@ public class PhoenixHBaseAccessor { } } + public void saveInstanceHostsMetadata(Map<String, Set<String>> instanceHostsMap) throws SQLException { + Connection conn = getConnection(); + PreparedStatement stmt = null; + try { + stmt = conn.prepareStatement(UPSERT_INSTANCE_HOST_METADATA_SQL); + int rowCount = 0; + + for (Map.Entry<String, Set<String>> hostInstancesEntry : instanceHostsMap.entrySet()) { + if (LOG.isTraceEnabled()) { + LOG.trace("Host Instances Entry: " + hostInstancesEntry); + } + + String instanceId = hostInstancesEntry.getKey(); + + for(String hostname : hostInstancesEntry.getValue()) { + stmt.clearParameters(); + stmt.setString(1, instanceId); + stmt.setString(2, hostname); + try { + stmt.executeUpdate(); + rowCount++; + } catch (SQLException sql) { + LOG.error("Error saving host instances metadata.", sql); + } + } + + } + + conn.commit(); + LOG.info("Saved " + rowCount + " host instances metadata records."); + + } finally { + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + // Ignore + } + } + if (conn != null) { + try { + conn.close(); + } catch (SQLException sql) { + // Ignore + } + } + } + } + /** * Save metdata on updates. * @param metricMetadata @Collection<@TimelineMetricMetadata> @@ -1658,6 +1717,53 @@ public class PhoenixHBaseAccessor { return hostedAppMap; } + public Map<String, Set<String>> getInstanceHostsMetdata() throws SQLException { + Map<String, Set<String>> instanceHostsMap = new HashMap<>(); + Connection conn = getConnection(); + PreparedStatement stmt = null; + ResultSet rs = null; + + try { + stmt = conn.prepareStatement(GET_INSTANCE_HOST_METADATA_SQL); + rs = stmt.executeQuery(); + + while (rs.next()) { + String instanceId = rs.getString("INSTANCE_ID"); + String hostname = rs.getString("HOSTNAME"); + + if (!instanceHostsMap.containsKey(instanceId)) { + instanceHostsMap.put(instanceId, new HashSet<String>()); + } + instanceHostsMap.get(instanceId).add(hostname); + } + + } finally { + if (rs != null) { + try { + rs.close(); + } catch (SQLException e) { + // Ignore + } + } + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + // Ignore + } + } + if (conn != null) { + try { + conn.close(); + } catch (SQLException sql) { + // Ignore + } + } + } + + return instanceHostsMap; + } + // No filter criteria support for now. public Map<TimelineMetricMetadataKey, TimelineMetricMetadata> getTimelineMetricMetadata() throws SQLException { Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadataMap = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/ambari/blob/6326589b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java index d049e33..121a8ae 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java @@ -89,6 +89,14 @@ public interface TimelineMetricStore { Map<String, Set<String>> getHostAppsMetadata() throws SQLException, IOException; /** + * Returns all instances and the set of hosts each instance is present on + * @return { instanceId : [ hosts ] } + * @throws SQLException + * @throws IOException + */ + Map<String, Set<String>> getInstanceHostsMetadata() throws SQLException, IOException; + + /** * Return a list of known live collector nodes * @return [ hostname ] */ http://git-wip-us.apache.org/repos/asf/ambari/blob/6326589b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java index 5310906..a5a3499 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java @@ -129,6 +129,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre condition.addOrderByColumn("METRIC_NAME"); condition.addOrderByColumn("HOSTNAME"); condition.addOrderByColumn("APP_ID"); + condition.addOrderByColumn("INSTANCE_ID"); condition.addOrderByColumn("SERVER_TIME"); return condition; } http://git-wip-us.apache.org/repos/asf/ambari/blob/6326589b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java index 7eb2457..f904ebe 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java @@ -54,8 +54,10 @@ public class TimelineMetricMetadataManager { private final Map<TimelineMetricMetadataKey, TimelineMetricMetadata> METADATA_CACHE = new ConcurrentHashMap<>(); // Map to lookup apps on a host private final Map<String, Set<String>> HOSTED_APPS_MAP = new ConcurrentHashMap<>(); + private final Map<String, Set<String>> INSTANCE_HOST_MAP = new ConcurrentHashMap<>(); // Sync only when needed AtomicBoolean SYNC_HOSTED_APPS_METADATA = new AtomicBoolean(false); + AtomicBoolean SYNC_HOSTED_INSTANCES_METADATA = new AtomicBoolean(false); // Single thread to sync back new writes to the store private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); @@ -122,14 +124,25 @@ public class TimelineMetricMetadataManager { return HOSTED_APPS_MAP; } + public Map<String, Set<String>> getHostedInstanceCache() { + return INSTANCE_HOST_MAP; + } + public boolean syncHostedAppsMetadata() { return SYNC_HOSTED_APPS_METADATA.get(); } + public boolean syncHostedInstanceMetadata() { + return SYNC_HOSTED_INSTANCES_METADATA.get(); + } + public void markSuccessOnSyncHostedAppsMetadata() { SYNC_HOSTED_APPS_METADATA.set(false); } + public void markSuccessOnSyncHostedInstanceMetadata() { + SYNC_HOSTED_INSTANCES_METADATA.set(false); + } /** * Test metric name for valid patterns and return true/false */ @@ -189,6 +202,23 @@ public class TimelineMetricMetadataManager { } } + public void putIfModifiedHostedInstanceMetadata(String instanceId, String hostname) { + if (StringUtils.isEmpty(instanceId)) { + return; + } + + Set<String> hosts = INSTANCE_HOST_MAP.get(instanceId); + if (hosts == null) { + hosts = new HashSet<>(); + INSTANCE_HOST_MAP.put(instanceId, hosts); + } + + if (!hosts.contains(hostname)) { + hosts.add(hostname); + SYNC_HOSTED_INSTANCES_METADATA.set(true); + } + } + public void persistMetadata(Collection<TimelineMetricMetadata> metadata) throws SQLException { hBaseAccessor.saveMetricMetadata(metadata); } @@ -197,6 +227,10 @@ public class TimelineMetricMetadataManager { hBaseAccessor.saveHostAppsMetadata(hostedApps); } + public void persistHostedInstanceMetadata(Map<String, Set<String>> hostedInstancesMetadata) throws SQLException { + hBaseAccessor.saveInstanceHostsMetadata(hostedInstancesMetadata); + } + public TimelineMetricMetadata getTimelineMetricMetadata(TimelineMetric timelineMetric, boolean isWhitelisted) { return new TimelineMetricMetadata( timelineMetric.getMetricName(), @@ -233,6 +267,10 @@ public class TimelineMetricMetadataManager { return hBaseAccessor.getHostedAppsMetadata(); } + Map<String, Set<String>> getHostedInstancesFromStore() throws SQLException { + return hBaseAccessor.getInstanceHostsMetdata(); + } + private boolean supportAggregates(TimelineMetric metric) { return MapUtils.isEmpty(metric.getMetadata()) || !(String.valueOf(true).equals(metric.getMetadata().get("skipAggregation"))); http://git-wip-us.apache.org/repos/asf/ambari/blob/6326589b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataSync.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataSync.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataSync.java index 25b525a..6d519f6 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataSync.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataSync.java @@ -45,11 +45,15 @@ public class TimelineMetricMetadataSync implements Runnable { persistMetricMetadata(); LOG.debug("Persisting hosted apps metadata..."); persistHostAppsMetadata(); + LOG.debug("Persisting hosted instance metadata..."); + persistHostInstancesMetadata(); if (cacheManager.isDistributedModeEnabled()) { LOG.debug("Refreshing metric metadata..."); refreshMetricMetadata(); LOG.debug("Refreshing hosted apps metadata..."); refreshHostAppsMetadata(); + LOG.debug("Refreshing hosted instances metadata..."); + refreshHostedInstancesMetadata(); } } @@ -147,6 +151,41 @@ public class TimelineMetricMetadataSync implements Runnable { } /** + * Sync apps instances data if needed + */ + private void persistHostInstancesMetadata() { + if (cacheManager.syncHostedInstanceMetadata()) { + Map<String, Set<String>> persistedData = null; + try { + persistedData = cacheManager.getHostedInstancesFromStore(); + } catch (SQLException e) { + LOG.warn("Failed on fetching hosted instances data from store.", e); + return; // Something wrong with store + } + + Map<String, Set<String>> cachedData = cacheManager.getHostedInstanceCache(); + Map<String, Set<String>> dataToSync = new HashMap<>(); + if (cachedData != null && !cachedData.isEmpty()) { + for (Map.Entry<String, Set<String>> cacheEntry : cachedData.entrySet()) { + // No persistence / stale data in store + if (persistedData == null || persistedData.isEmpty() || + !persistedData.containsKey(cacheEntry.getKey()) || + !persistedData.get(cacheEntry.getKey()).containsAll(cacheEntry.getValue())) { + dataToSync.put(cacheEntry.getKey(), cacheEntry.getValue()); + } + } + try { + cacheManager.persistHostedInstanceMetadata(dataToSync); + cacheManager.markSuccessOnSyncHostedInstanceMetadata(); + + } catch (SQLException e) { + LOG.warn("Error persisting hosted apps metadata.", e); + } + } + + } + } + /** * Read all hosted apps metadata and update cached values - HA */ private void refreshHostAppsMetadata() { @@ -166,4 +205,22 @@ public class TimelineMetricMetadataSync implements Runnable { } } } + + private void refreshHostedInstancesMetadata() { + Map<String, Set<String>> hostedInstancesFromStore = null; + try { + hostedInstancesFromStore = cacheManager.getHostedInstancesFromStore(); + } catch (SQLException e) { + LOG.warn("Error refreshing metadata from store.", e); + } + if (hostedInstancesFromStore != null) { + Map<String, Set<String>> cachedData = cacheManager.getHostedInstanceCache(); + + for (Map.Entry<String, Set<String>> storeEntry : hostedInstancesFromStore.entrySet()) { + if (!cachedData.containsKey(storeEntry.getKey())) { + cachedData.put(storeEntry.getKey(), storeEntry.getValue()); + } + } + } + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/6326589b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java index 0c8e5a7..d39230d 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java @@ -148,6 +148,12 @@ public class PhoenixTransactSQL { "CONSTRAINT pk PRIMARY KEY (HOSTNAME))" + "DATA_BLOCK_ENCODING='%s', COMPRESSION='%s'"; + public static final String CREATE_INSTANCE_HOST_TABLE_SQL = + "CREATE TABLE IF NOT EXISTS INSTANCE_HOST_METADATA " + + "(INSTANCE_ID VARCHAR, HOSTNAME VARCHAR, " + + "CONSTRAINT pk PRIMARY KEY (INSTANCE_ID, HOSTNAME))" + + "DATA_BLOCK_ENCODING='%s', COMPRESSION='%s'"; + public static final String ALTER_METRICS_METADATA_TABLE = "ALTER TABLE METRICS_METADATA ADD IF NOT EXISTS IS_WHITELISTED BOOLEAN"; @@ -230,6 +236,9 @@ public class PhoenixTransactSQL { public static final String UPSERT_HOSTED_APPS_METADATA_SQL = "UPSERT INTO HOSTED_APPS_METADATA (HOSTNAME, APP_IDS) VALUES (?, ?)"; + public static final String UPSERT_INSTANCE_HOST_METADATA_SQL = + "UPSERT INTO INSTANCE_HOST_METADATA (INSTANCE_ID, HOSTNAME) VALUES (?, ?)"; + /** * Retrieve a set of rows from metrics records table. */ @@ -309,6 +318,9 @@ public class PhoenixTransactSQL { public static final String GET_HOSTED_APPS_METADATA_SQL = "SELECT " + "HOSTNAME, APP_IDS FROM HOSTED_APPS_METADATA"; + public static final String GET_INSTANCE_HOST_METADATA_SQL = "SELECT " + + "INSTANCE_ID, HOSTNAME FROM INSTANCE_HOST_METADATA"; + /** * Aggregate host metrics using a GROUP BY clause to take advantage of * N - way parallel scan where N = number of regions. http://git-wip-us.apache.org/repos/asf/ambari/blob/6326589b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java index 304a8e0..6278c59 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java @@ -412,6 +412,22 @@ public class TimelineWebServices { } } + @GET + @Path("/metrics/instances") + @Produces({ MediaType.APPLICATION_JSON }) + public Map<String, Set<String>> getClusterHostsMetadata( + @Context HttpServletRequest req, + @Context HttpServletResponse res + ) { + init(res); + + try { + return timelineMetricStore.getInstanceHostsMetadata(); + } catch (Exception e) { + throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); + } + } + /** * This is a discovery endpoint that advertises known live collector * instances. Note: It will always answer with current instance as live. http://git-wip-us.apache.org/repos/asf/ambari/blob/6326589b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java index b2e8cac..b40481d 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java @@ -97,6 +97,11 @@ public class TestTimelineMetricStore implements TimelineMetricStore { } @Override + public Map<String, Set<String>> getInstanceHostsMetadata() throws SQLException, IOException { + return Collections.emptyMap(); + } + + @Override public List<String> getLiveInstances() { return Collections.emptyList(); } http://git-wip-us.apache.org/repos/asf/ambari/blob/6326589b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java index b243e0b..c62fd34 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java @@ -69,6 +69,7 @@ public class TestMetadataManager extends AbstractMiniHBaseClusterTest { metric2.setStartTime(now - 1000); metric2.setAppId("dummy_app2"); metric2.setType("Integer"); + metric2.setInstanceId("instance2"); metric2.setMetricValues(new TreeMap<Long, Double>() {{ put(now - 100, 1.0); put(now - 200, 2.0); @@ -144,5 +145,12 @@ public class TestMetadataManager extends AbstractMiniHBaseClusterTest { Assert.assertEquals("dummy_app1", savedHostData.get("dummy_host1").iterator().next()); Assert.assertEquals("dummy_app2", savedHostData.get("dummy_host2").iterator().next()); Assert.assertEquals("dummy_app3", cachedHostData.get("dummy_host3").iterator().next()); + + + Map<String, Set<String>> cachedHostInstanceData = metadataManager.getHostedInstanceCache(); + Map<String, Set<String>> savedHostInstanceData = metadataManager.getHostedInstancesFromStore(); + Assert.assertEquals(cachedHostInstanceData.size(), savedHostInstanceData.size()); + Assert.assertEquals("dummy_host2", cachedHostInstanceData.get("instance2").iterator().next()); + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/6326589b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataSync.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataSync.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataSync.java index 5eab903..181abca 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataSync.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataSync.java @@ -56,9 +56,15 @@ public class TestMetadataSync { put("h2", new HashSet<>(Arrays.asList("a1", "a2"))); }}; + Map<String, Set<String>> hostedInstances = new HashMap<String, Set<String>>() {{ + put("i1", new HashSet<>(Arrays.asList("h1"))); + put("i2", new HashSet<>(Arrays.asList("h1", "h2"))); + }}; + expect(configuration.get("timeline.metrics.service.operation.mode", "")).andReturn("distributed"); expect(hBaseAccessor.getTimelineMetricMetadata()).andReturn(metadata); expect(hBaseAccessor.getHostedAppsMetadata()).andReturn(hostedApps); + expect(hBaseAccessor.getInstanceHostsMetdata()).andReturn(hostedInstances); replay(configuration, hBaseAccessor); @@ -80,6 +86,12 @@ public class TestMetadataSync { Assert.assertEquals(2, hostedApps.size()); Assert.assertEquals(1, hostedApps.get("h1").size()); Assert.assertEquals(2, hostedApps.get("h2").size()); + + hostedInstances = metadataManager.getHostedInstanceCache(); + Assert.assertEquals(2, hostedInstances.size()); + Assert.assertEquals(1, hostedInstances.get("i1").size()); + Assert.assertEquals(2, hostedInstances.get("i2").size()); + } @Test
