http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java new file mode 100644 index 0000000..4c4e673 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java @@ -0,0 +1,803 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.query; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.Precision; +import org.apache.hadoop.metrics2.sink.timeline.PrecisionLimitExceededException; +import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor; + +/** + * Encapsulate all metrics related SQL queries. + */ +public class PhoenixTransactSQL { + + public static final Log LOG = LogFactory.getLog(PhoenixTransactSQL.class); + + /** + * Create table to store individual metric records. + */ + public static final String CREATE_METRICS_TABLE_SQL = "CREATE TABLE IF NOT " + + "EXISTS METRIC_RECORD (UUID BINARY(20) NOT NULL, " + + "SERVER_TIME BIGINT NOT NULL, " + + "METRIC_SUM DOUBLE, " + + "METRIC_COUNT UNSIGNED_INT, " + + "METRIC_MAX DOUBLE, " + + "METRIC_MIN DOUBLE, " + + "METRICS VARCHAR CONSTRAINT pk " + + "PRIMARY KEY (UUID, SERVER_TIME ROW_TIMESTAMP)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " + + "TTL=%s, COMPRESSION='%s'"; + + public static final String CREATE_CONTAINER_METRICS_TABLE_SQL = + "CREATE TABLE IF NOT EXISTS CONTAINER_METRICS " + + "(APP_ID VARCHAR, " + + " CONTAINER_ID VARCHAR," + + " START_TIME TIMESTAMP," + + " FINISH_TIME TIMESTAMP, " + + " DURATION BIGINT," + + " HOSTNAME VARCHAR," + + " EXIT_CODE INTEGER," + + " LOCALIZATION_DURATION BIGINT," + + " LAUNCH_DURATION BIGINT," + + " MEM_REQUESTED_GB DOUBLE," + + " MEM_REQUESTED_GB_MILLIS DOUBLE," + + " MEM_VIRTUAL_GB DOUBLE," + + " MEM_USED_GB_MIN DOUBLE," + + " MEM_USED_GB_MAX DOUBLE," + + " MEM_USED_GB_AVG DOUBLE," + + " MEM_USED_GB_50_PCT DOUBLE," + + " MEM_USED_GB_75_PCT DOUBLE," + + " MEM_USED_GB_90_PCT DOUBLE," + + " MEM_USED_GB_95_PCT DOUBLE," + + " MEM_USED_GB_99_PCT DOUBLE," + + " MEM_UNUSED_GB DOUBLE," + + " MEM_UNUSED_GB_MILLIS DOUBLE " + + " CONSTRAINT pk PRIMARY KEY(APP_ID, CONTAINER_ID)) DATA_BLOCK_ENCODING='%s'," + + " IMMUTABLE_ROWS=true, TTL=%s, COMPRESSION='%s'"; + + public static final String CREATE_METRICS_AGGREGATE_TABLE_SQL = + "CREATE TABLE IF NOT EXISTS %s " + + "(UUID BINARY(20) NOT NULL, " + + "SERVER_TIME BIGINT NOT NULL, " + + "METRIC_SUM DOUBLE," + + "METRIC_COUNT UNSIGNED_INT, " + + "METRIC_MAX DOUBLE," + + "METRIC_MIN DOUBLE CONSTRAINT pk " + + "PRIMARY KEY (UUID, SERVER_TIME ROW_TIMESTAMP)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, TTL=%s," + + " COMPRESSION='%s'"; + + public static final String CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL = + "CREATE TABLE IF NOT EXISTS %s " + + "(UUID BINARY(16) NOT NULL, " + + "SERVER_TIME BIGINT NOT NULL, " + + "METRIC_SUM DOUBLE, " + + "HOSTS_COUNT UNSIGNED_INT, " + + "METRIC_MAX DOUBLE, " + + "METRIC_MIN DOUBLE " + + "CONSTRAINT pk PRIMARY KEY (UUID, SERVER_TIME ROW_TIMESTAMP)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " + + "TTL=%s, COMPRESSION='%s'"; + + // HOSTS_COUNT vs METRIC_COUNT + public static final String CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL = + "CREATE TABLE IF NOT EXISTS %s " + + "(UUID BINARY(16) NOT NULL, " + + "SERVER_TIME BIGINT NOT NULL, " + + "METRIC_SUM DOUBLE, " + + "METRIC_COUNT UNSIGNED_INT, " + + "METRIC_MAX DOUBLE, " + + "METRIC_MIN DOUBLE " + + "CONSTRAINT pk PRIMARY KEY (UUID, SERVER_TIME ROW_TIMESTAMP)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " + + "TTL=%s, COMPRESSION='%s'"; + + public static final String CREATE_METRICS_METADATA_TABLE_SQL = + "CREATE TABLE IF NOT EXISTS METRICS_METADATA " + + "(METRIC_NAME VARCHAR, " + + "APP_ID VARCHAR, " + + "INSTANCE_ID VARCHAR, " + + "UUID BINARY(16), " + + "UNITS CHAR(20), " + + "TYPE CHAR(20), " + + "START_TIME UNSIGNED_LONG, " + + "SUPPORTS_AGGREGATION BOOLEAN, " + + "IS_WHITELISTED BOOLEAN " + + "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID)) " + + "DATA_BLOCK_ENCODING='%s', COMPRESSION='%s'"; + + public static final String CREATE_HOSTED_APPS_METADATA_TABLE_SQL = + "CREATE TABLE IF NOT EXISTS HOSTED_APPS_METADATA " + + "(HOSTNAME VARCHAR, UUID BINARY(4), APP_IDS VARCHAR, " + + "CONSTRAINT pk PRIMARY KEY (HOSTNAME))" + + "DATA_BLOCK_ENCODING='%s', COMPRESSION='%s'"; + + public static final String CREATE_INSTANCE_HOST_TABLE_SQL = + "CREATE TABLE IF NOT EXISTS INSTANCE_HOST_METADATA " + + "(INSTANCE_ID VARCHAR, HOSTNAME VARCHAR, " + + "CONSTRAINT pk PRIMARY KEY (INSTANCE_ID, HOSTNAME))" + + "DATA_BLOCK_ENCODING='%s', COMPRESSION='%s'"; + + public static final String ALTER_METRICS_METADATA_TABLE = + "ALTER TABLE METRICS_METADATA ADD IF NOT EXISTS IS_WHITELISTED BOOLEAN"; + + /** + * ALTER table to set new options + */ + public static final String ALTER_SQL = "ALTER TABLE %s SET TTL=%s"; + + /** + * Insert into metric records table. + */ + public static final String UPSERT_METRICS_SQL = "UPSERT INTO %s " + + "(UUID, " + + "SERVER_TIME, " + + "METRIC_SUM, " + + "METRIC_MAX, " + + "METRIC_MIN, " + + "METRIC_COUNT, " + + "METRICS) VALUES " + + "(?, ?, ?, ?, ?, ?, ?)"; + + public static final String UPSERT_CONTAINER_METRICS_SQL = "UPSERT INTO %s " + + "(APP_ID," + + " CONTAINER_ID," + + " START_TIME," + + " FINISH_TIME," + + " DURATION," + + " HOSTNAME," + + " EXIT_CODE," + + " LOCALIZATION_DURATION," + + " LAUNCH_DURATION," + + " MEM_REQUESTED_GB," + + " MEM_REQUESTED_GB_MILLIS," + + " MEM_VIRTUAL_GB," + + " MEM_USED_GB_MIN," + + " MEM_USED_GB_MAX," + + " MEM_USED_GB_AVG," + + " MEM_USED_GB_50_PCT," + + " MEM_USED_GB_75_PCT," + + " MEM_USED_GB_90_PCT," + + " MEM_USED_GB_95_PCT," + + " MEM_USED_GB_99_PCT," + + " MEM_UNUSED_GB," + + " MEM_UNUSED_GB_MILLIS) VALUES " + + "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + + public static final String UPSERT_CLUSTER_AGGREGATE_SQL = "UPSERT INTO " + + "%s (UUID, " + + "SERVER_TIME, " + + "METRIC_SUM, " + + "HOSTS_COUNT, " + + "METRIC_MAX, " + + "METRIC_MIN) " + + "VALUES (?, ?, ?, ?, ?, ?)"; + + public static final String UPSERT_CLUSTER_AGGREGATE_TIME_SQL = "UPSERT INTO" + + " %s (UUID, SERVER_TIME, " + + "METRIC_SUM, " + + "METRIC_COUNT, " + + "METRIC_MAX, " + + "METRIC_MIN) " + + "VALUES (?, ?, ?, ?, ?, ?)"; + + public static final String UPSERT_AGGREGATE_RECORD_SQL = "UPSERT INTO " + + "%s (UUID, " + + "SERVER_TIME, " + + "METRIC_SUM, " + + "METRIC_MAX, " + + "METRIC_MIN," + + "METRIC_COUNT) " + + "VALUES (?, ?, ?, ?, ?, ?)"; + + public static final String UPSERT_METADATA_SQL = + "UPSERT INTO METRICS_METADATA (METRIC_NAME, APP_ID, INSTANCE_ID, UUID, UNITS, TYPE, " + + "START_TIME, SUPPORTS_AGGREGATION, IS_WHITELISTED) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; + + public static final String UPSERT_HOSTED_APPS_METADATA_SQL = + "UPSERT INTO HOSTED_APPS_METADATA (HOSTNAME, UUID, APP_IDS) VALUES (?, ?, ?)"; + + public static final String UPSERT_INSTANCE_HOST_METADATA_SQL = + "UPSERT INTO INSTANCE_HOST_METADATA (INSTANCE_ID, HOSTNAME) VALUES (?, ?)"; + + /** + * Retrieve a set of rows from metrics records table. + */ + public static final String GET_METRIC_SQL = "SELECT UUID, SERVER_TIME, " + + "METRIC_SUM, " + + "METRIC_MAX, " + + "METRIC_MIN, " + + "METRIC_COUNT, " + + "METRICS " + + "FROM %s"; + + /** + * Get latest metrics for a number of hosts + * + * Different queries for a number and a single hosts are used due to bug + * in Apache Phoenix + */ + public static final String GET_LATEST_METRIC_SQL = "SELECT %s E.UUID AS UUID, " + + "E.SERVER_TIME AS SERVER_TIME, " + + "E.METRIC_SUM AS METRIC_SUM, " + + "E.METRIC_MAX AS METRIC_MAX, E.METRIC_MIN AS METRIC_MIN, " + + "E.METRIC_COUNT AS METRIC_COUNT, E.METRICS AS METRICS " + + "FROM %s AS E " + + "INNER JOIN " + + "(SELECT UUID, MAX(SERVER_TIME) AS MAX_SERVER_TIME " + + "FROM %s " + + "WHERE " + + "%s " + + "GROUP BY UUID) " + + "AS I " + + "ON E.UUID=I.UUID " + + "AND E.SERVER_TIME=I.MAX_SERVER_TIME"; + + public static final String GET_METRIC_AGGREGATE_ONLY_SQL = "SELECT UUID, " + + "SERVER_TIME, " + + "METRIC_SUM, " + + "METRIC_MAX, " + + "METRIC_MIN, " + + "METRIC_COUNT " + + "FROM %s"; + + public static final String GET_CLUSTER_AGGREGATE_SQL = "SELECT " + + "UUID, " + + "SERVER_TIME, " + + "METRIC_SUM, " + + "HOSTS_COUNT, " + + "METRIC_MAX, " + + "METRIC_MIN " + + "FROM %s"; + + public static final String GET_CLUSTER_AGGREGATE_TIME_SQL = "SELECT " + + "UUID, " + + "SERVER_TIME, " + + "METRIC_SUM, " + + "METRIC_COUNT, " + + "METRIC_MAX, " + + "METRIC_MIN " + + "FROM %s"; + + public static final String TOP_N_INNER_SQL = "SELECT UUID " + + "FROM %s WHERE %s GROUP BY UUID ORDER BY %s LIMIT %s"; + + public static final String GET_METRIC_METADATA_SQL = "SELECT " + + "METRIC_NAME, APP_ID, INSTANCE_ID, UUID, UNITS, TYPE, START_TIME, " + + "SUPPORTS_AGGREGATION, IS_WHITELISTED FROM METRICS_METADATA"; + + public static final String GET_HOSTED_APPS_METADATA_SQL = "SELECT " + + "HOSTNAME, UUID, APP_IDS FROM HOSTED_APPS_METADATA"; + + public static final String GET_INSTANCE_HOST_METADATA_SQL = "SELECT " + + "INSTANCE_ID, HOSTNAME FROM INSTANCE_HOST_METADATA"; + + /** + * Aggregate host metrics using a GROUP BY clause to take advantage of + * N - way parallel scan where N = number of regions. + */ + public static final String GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL = "UPSERT " + + "INTO %s (UUID, SERVER_TIME, METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) " + + "SELECT UUID, %s AS SERVER_TIME, " + + "SUM(METRIC_SUM), SUM(METRIC_COUNT), MAX(METRIC_MAX), MIN(METRIC_MIN) " + + "FROM %s WHERE%s SERVER_TIME > %s AND SERVER_TIME <= %s " + + "GROUP BY UUID"; + + /** + * Downsample host metrics. + */ + public static final String DOWNSAMPLE_HOST_METRIC_SQL_UPSERT_PREFIX = "UPSERT INTO %s (UUID, SERVER_TIME, " + + "METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) "; + + public static final String TOPN_DOWNSAMPLER_HOST_METRIC_SELECT_SQL = "SELECT UUID, " + + "%s AS SERVER_TIME, %s, 1, %s, %s FROM %s WHERE UUID IN %s AND SERVER_TIME > %s AND SERVER_TIME <= %s " + + "GROUP BY UUID ORDER BY %s DESC LIMIT %s"; + + /** + * Aggregate app metrics using a GROUP BY clause to take advantage of + * N - way parallel scan where N = number of regions. + */ + public static final String GET_AGGREGATED_APP_METRIC_GROUPBY_SQL = "UPSERT " + + "INTO %s (UUID, SERVER_TIME, METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) SELECT UUID, %s AS SERVER_TIME, " + + "ROUND(AVG(METRIC_SUM),2), ROUND(AVG(%s)), MAX(METRIC_MAX), MIN(METRIC_MIN) FROM %s WHERE%s SERVER_TIME > %s AND " + + "SERVER_TIME <= %s GROUP BY UUID"; + + /** + * Downsample cluster metrics. + */ + public static final String DOWNSAMPLE_CLUSTER_METRIC_SQL_UPSERT_PREFIX = "UPSERT INTO %s (UUID, SERVER_TIME, " + + "METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) "; + + public static final String TOPN_DOWNSAMPLER_CLUSTER_METRIC_SELECT_SQL = "SELECT UUID, " + + "%s AS SERVER_TIME, %s, 1, %s, %s FROM %s WHERE UUID IN %s AND SERVER_TIME > %s AND SERVER_TIME <= %s " + + "GROUP BY UUID ORDER BY %s DESC LIMIT %s"; + + public static final String METRICS_RECORD_TABLE_NAME = "METRIC_RECORD"; + + public static final String CONTAINER_METRICS_TABLE_NAME = "CONTAINER_METRICS"; + + public static final String METRICS_AGGREGATE_MINUTE_TABLE_NAME = + "METRIC_RECORD_MINUTE"; + public static final String METRICS_AGGREGATE_HOURLY_TABLE_NAME = + "METRIC_RECORD_HOURLY"; + public static final String METRICS_AGGREGATE_DAILY_TABLE_NAME = + "METRIC_RECORD_DAILY"; + public static final String METRICS_CLUSTER_AGGREGATE_TABLE_NAME = + "METRIC_AGGREGATE"; + public static final String METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME = + "METRIC_AGGREGATE_MINUTE"; + public static final String METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME = + "METRIC_AGGREGATE_HOURLY"; + public static final String METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME = + "METRIC_AGGREGATE_DAILY"; + + public static final String PHOENIX_TABLES_REGEX_PATTERN_STRING = "METRIC_.*"; + + public static final String[] PHOENIX_TABLES = { + METRICS_RECORD_TABLE_NAME, + METRICS_AGGREGATE_MINUTE_TABLE_NAME, + METRICS_AGGREGATE_HOURLY_TABLE_NAME, + METRICS_AGGREGATE_DAILY_TABLE_NAME, + METRICS_CLUSTER_AGGREGATE_TABLE_NAME, + METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME, + METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME, + METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME + }; + + public static final String DEFAULT_TABLE_COMPRESSION = "SNAPPY"; + public static final String DEFAULT_ENCODING = "FAST_DIFF"; + public static final long HOUR = 3600000; // 1 hour + public static final long DAY = 86400000; // 1 day + private static boolean sortMergeJoinEnabled = false; + + /** + * Filter to optimize HBase scan by using file timestamps. This prevents + * a full table scan of metric records. + * + * @return Phoenix Hint String + */ + public static String getNaiveTimeRangeHint(Long startTime, Long delta) { + return String.format("/*+ NATIVE_TIME_RANGE(%s) */", (startTime - delta)); + } + + /** + * Falling back to sort merge join algorithm if default queries fail. + * + * @return Phoenix Hint String + */ + public static String getLatestMetricsHints() { + if (sortMergeJoinEnabled) { + return "/*+ USE_SORT_MERGE_JOIN NO_CACHE */"; + } + return ""; + } + + public static void setSortMergeJoinEnabled(boolean sortMergeJoinEnabled) { + PhoenixTransactSQL.sortMergeJoinEnabled = sortMergeJoinEnabled; + } + + public static PreparedStatement prepareGetMetricsSqlStmt(Connection connection, + Condition condition) throws SQLException { + + validateConditionIsNotEmpty(condition); + validateRowCountLimit(condition); + + String stmtStr; + if (condition.getStatement() != null) { + stmtStr = condition.getStatement(); + } else { + String metricsTable; + String query; + if (condition.getPrecision() == null) { + long endTime = condition.getEndTime() == null ? System.currentTimeMillis() : condition.getEndTime(); + long startTime = condition.getStartTime() == null ? 0 : condition.getStartTime(); + Precision precision = Precision.getPrecision(startTime, endTime); + condition.setPrecision(precision); + } + switch (condition.getPrecision()) { + case DAYS: + metricsTable = METRICS_AGGREGATE_DAILY_TABLE_NAME; + query = GET_METRIC_AGGREGATE_ONLY_SQL; + break; + case HOURS: + metricsTable = METRICS_AGGREGATE_HOURLY_TABLE_NAME; + query = GET_METRIC_AGGREGATE_ONLY_SQL; + break; + case MINUTES: + metricsTable = METRICS_AGGREGATE_MINUTE_TABLE_NAME; + query = GET_METRIC_AGGREGATE_ONLY_SQL; + break; + default: + metricsTable = METRICS_RECORD_TABLE_NAME; + query = GET_METRIC_SQL; + } + + stmtStr = String.format(query, metricsTable); + } + + StringBuilder sb = new StringBuilder(stmtStr); + + if (!(condition instanceof EmptyCondition)) { + sb.append(" WHERE "); + sb.append(condition.getConditionClause()); + String orderByClause = condition.getOrderByClause(true); + if (orderByClause != null) { + sb.append(orderByClause); + } else { + sb.append(" ORDER BY UUID, SERVER_TIME "); + } + } + + if (condition.getLimit() != null) { + sb.append(" LIMIT ").append(condition.getLimit()); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("SQL: " + sb.toString() + ", condition: " + condition); + } + + PreparedStatement stmt = null; + try { + stmt = connection.prepareStatement(sb.toString()); + int pos = 1; + pos = addUuids(condition, pos, stmt); + + if (condition instanceof TopNCondition) { + pos = addStartTime(condition, pos, stmt); + pos = addEndTime(condition, pos, stmt); + } + + pos = addStartTime(condition, pos, stmt); + addEndTime(condition, pos, stmt); + + if (condition.getFetchSize() != null) { + stmt.setFetchSize(condition.getFetchSize()); + } + } catch (SQLException e) { + if (stmt != null) { + stmt.close(); + } + throw e; + } + + if (condition instanceof TopNCondition) { + LOG.info(sb.toString()); + } + return stmt; + } + + private static void validateConditionIsNotEmpty(Condition condition) { + if (condition.isEmpty()) { + throw new IllegalArgumentException("Condition is empty."); + } + } + + private static void validateRowCountLimit(Condition condition) { + if (condition.getMetricNames() == null + || condition.getMetricNames().isEmpty()) { + //aggregator can use empty metrics query + return; + } + + long range = condition.getEndTime() - condition.getStartTime(); + long rowsPerMetric; + + //Get Precision (passed in or computed) and estimate values returned based on that. + Precision precision = condition.getPrecision(); + if (precision == null) { + precision = Precision.getPrecision(condition.getStartTime(), condition.getEndTime()); + } + + switch (precision) { + case DAYS: + rowsPerMetric = TimeUnit.MILLISECONDS.toDays(range); + break; + case HOURS: + rowsPerMetric = TimeUnit.MILLISECONDS.toHours(range); + break; + case MINUTES: + rowsPerMetric = TimeUnit.MILLISECONDS.toMinutes(range)/5; //5 minute data in METRIC_AGGREGATE_MINUTE table. + break; + default: + rowsPerMetric = TimeUnit.MILLISECONDS.toSeconds(range)/10; //10 second data in METRIC_AGGREGATE table + } + + List<String> hostNames = condition.getHostnames(); + int numHosts = (hostNames == null || hostNames.isEmpty()) ? 1 : condition.getHostnames().size(); + + long totalRowsRequested = rowsPerMetric * condition.getMetricNames().size() * numHosts; + + if (totalRowsRequested > PhoenixHBaseAccessor.RESULTSET_LIMIT) { + throw new PrecisionLimitExceededException("Requested " + condition.getMetricNames().size() + " metrics for " + + numHosts + " hosts in " + precision + " precision for the time range of " + range/1000 + + " seconds. Estimated resultset size of " + totalRowsRequested + " is greater than the limit of " + + PhoenixHBaseAccessor.RESULTSET_LIMIT + ". Request lower precision or fewer number of metrics or hosts." + + " Alternatively, increase the limit value through ams-site:timeline.metrics.service.default.result.limit config"); + } + } + + public static PreparedStatement prepareGetLatestMetricSqlStmt( + Connection connection, Condition condition) throws SQLException { + + validateConditionIsNotEmpty(condition); + + if (condition.getMetricNames() == null + || condition.getMetricNames().isEmpty()) { + throw new IllegalArgumentException("Point in time query without " + + "metric names not supported "); + } + + String stmtStr; + if (condition.getStatement() != null) { + stmtStr = condition.getStatement(); + } else { + stmtStr = String.format(GET_LATEST_METRIC_SQL, + getLatestMetricsHints(), + METRICS_RECORD_TABLE_NAME, + METRICS_RECORD_TABLE_NAME, + condition.getConditionClause()); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("SQL: " + stmtStr + ", condition: " + condition); + } + PreparedStatement stmt = null; + try { + stmt = connection.prepareStatement(stmtStr); + setQueryParameters(stmt, condition); + } catch (SQLException e) { + if (stmt != null) { + stmt.close(); + } + throw e; + } + + return stmt; + } + + private static PreparedStatement setQueryParameters(PreparedStatement stmt, + Condition condition) throws SQLException { + int pos = 1; + //For GET_LATEST_METRIC_SQL_SINGLE_HOST parameters should be set 2 times + do { + if (condition.getUuids() != null) { + for (byte[] uuid : condition.getUuids()) { + stmt.setBytes(pos++, uuid); + } + } + if (condition.getFetchSize() != null) { + stmt.setFetchSize(condition.getFetchSize()); + pos++; + } + } while (pos < stmt.getParameterMetaData().getParameterCount()); + + return stmt; + } + + public static PreparedStatement prepareGetAggregateSqlStmt( + Connection connection, Condition condition) throws SQLException { + + validateConditionIsNotEmpty(condition); + validateRowCountLimit(condition); + + String metricsAggregateTable; + String queryStmt; + if (condition.getPrecision() == null) { + long endTime = condition.getEndTime() == null ? System.currentTimeMillis() : condition.getEndTime(); + long startTime = condition.getStartTime() == null ? 0 : condition.getStartTime(); + condition.setPrecision(Precision.getPrecision(startTime, endTime)); + } + switch (condition.getPrecision()) { + case DAYS: + metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME; + queryStmt = GET_CLUSTER_AGGREGATE_TIME_SQL; + break; + case HOURS: + metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; + queryStmt = GET_CLUSTER_AGGREGATE_TIME_SQL; + break; + case MINUTES: + metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME; + queryStmt = GET_CLUSTER_AGGREGATE_TIME_SQL; + break; + default: + metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_TABLE_NAME; + queryStmt = GET_CLUSTER_AGGREGATE_SQL; + } + + queryStmt = String.format(queryStmt, metricsAggregateTable); + + StringBuilder sb = new StringBuilder(queryStmt); + sb.append(" WHERE "); + sb.append(condition.getConditionClause()); + sb.append(" ORDER BY UUID, SERVER_TIME"); + if (condition.getLimit() != null) { + sb.append(" LIMIT ").append(condition.getLimit()); + } + + String query = sb.toString(); + + if (LOG.isDebugEnabled()) { + LOG.debug("SQL => " + query + ", condition => " + condition); + } + PreparedStatement stmt = null; + try { + stmt = connection.prepareStatement(query); + int pos = 1; + + pos = addUuids(condition, pos, stmt); + + if (condition instanceof TopNCondition) { + pos = addStartTime(condition, pos, stmt); + pos = addEndTime(condition, pos, stmt); + } + + // TODO: Upper case all strings on POST + pos = addStartTime(condition, pos, stmt); + addEndTime(condition, pos, stmt); + } catch (SQLException e) { + if (stmt != null) { + stmt.close(); + } + throw e; + } + + if (condition instanceof TopNCondition) { + LOG.info(sb.toString()); + } + return stmt; + } + + public static PreparedStatement prepareGetLatestAggregateMetricSqlStmt( + Connection connection, SplitByMetricNamesCondition condition) throws SQLException { + + validateConditionIsNotEmpty(condition); + + String stmtStr; + if (condition.getStatement() != null) { + stmtStr = condition.getStatement(); + } else { + stmtStr = String.format(GET_CLUSTER_AGGREGATE_SQL, + METRICS_CLUSTER_AGGREGATE_TABLE_NAME); + } + + StringBuilder sb = new StringBuilder(stmtStr); + sb.append(" WHERE "); + sb.append(condition.getConditionClause()); + String orderByClause = condition.getOrderByClause(false); + if (orderByClause != null) { + sb.append(orderByClause); + } else { + sb.append(" ORDER BY UUID DESC, SERVER_TIME DESC "); + } + + sb.append(" LIMIT ").append(condition.getMetricNames().size()); + + String query = sb.toString(); + if (LOG.isDebugEnabled()) { + LOG.debug("SQL: " + query + ", condition: " + condition); + } + + PreparedStatement stmt = null; + try { + stmt = connection.prepareStatement(query); + int pos = 1; + if (condition.getMetricNames() != null) { + for (; pos <= condition.getMetricNames().size(); pos++) { + stmt.setBytes(pos, condition.getCurrentUuid()); + } + } + } catch (SQLException e) { + if (stmt != null) { + + } + throw e; + } + + return stmt; + } + + public static String getTargetTableUsingPrecision(Precision precision, boolean withHosts) { + + String inputTable = null; + if (precision != null) { + if (withHosts) { + switch (precision) { + case DAYS: + inputTable = PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME; + break; + case HOURS: + inputTable = PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME; + break; + case MINUTES: + inputTable = PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME; + break; + default: + inputTable = PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; + } + } else { + switch (precision) { + case DAYS: + inputTable = PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME; + break; + case HOURS: + inputTable = PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; + break; + case MINUTES: + inputTable = PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME; + break; + default: + inputTable = PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME; + } + } + } else { + if (withHosts) { + inputTable = PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; + } else { + inputTable = PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME; + } + } + return inputTable; + } + + private static int addUuids(Condition condition, int pos, PreparedStatement stmt) throws SQLException { + if (condition.getUuids() != null) { + for (int pos2 = 1 ; pos2 <= condition.getUuids().size(); pos2++,pos++) { + byte[] uuid = condition.getUuids().get(pos2 - 1); + if (LOG.isDebugEnabled()) { + LOG.debug("Setting pos: " + pos + ", value = " + new String(uuid)); + } + + if (uuid.length != TimelineMetricMetadataManager.HOSTNAME_UUID_LENGTH + TimelineMetricMetadataManager.TIMELINE_METRIC_UUID_LENGTH) { + stmt.setString(pos, new String(uuid)); + } else { + stmt.setBytes(pos, uuid); + } + } + } + return pos; + } + + private static int addStartTime(Condition condition, int pos, PreparedStatement stmt) throws SQLException { + if (condition.getStartTime() != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Setting pos: " + pos + ", value: " + condition.getStartTime()); + } + stmt.setLong(pos++, condition.getStartTime()); + } + return pos; + } + + private static int addEndTime(Condition condition, int pos, PreparedStatement stmt) throws SQLException { + + if (condition.getEndTime() != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Setting pos: " + pos + ", value: " + condition.getEndTime()); + } + stmt.setLong(pos++, condition.getEndTime()); + } + return pos; + } + +}
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/SplitByMetricNamesCondition.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/SplitByMetricNamesCondition.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/SplitByMetricNamesCondition.java new file mode 100644 index 0000000..2f1697f --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/SplitByMetricNamesCondition.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.query; + +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.metrics2.sink.timeline.Precision; +// TODO get rid of this class +public class SplitByMetricNamesCondition implements Condition { + private final Condition adaptee; + private byte[] currentUuid; + private boolean metricNamesNotCondition = false; + + public SplitByMetricNamesCondition(Condition condition){ + this.adaptee = condition; + } + + @Override + public boolean isEmpty() { + return adaptee.isEmpty(); + } + + @Override + public List<byte[]> getUuids() { + return adaptee.getUuids(); + } + + @Override + public List<String> getMetricNames() { + return Collections.singletonList(new String(currentUuid)); + } + + @Override + public boolean isPointInTime() { + return adaptee.isPointInTime(); + } + + @Override + public boolean isGrouped() { + return adaptee.isGrouped(); + } + + @Override + public void setStatement(String statement) { + adaptee.setStatement(statement); + } + + @Override + public List<String> getHostnames() { + return adaptee.getHostnames(); + } + + @Override + public Precision getPrecision() { + return adaptee.getPrecision(); + } + + @Override + public void setPrecision(Precision precision) { + adaptee.setPrecision(precision); + } + + @Override + public String getAppId() { + return adaptee.getAppId(); + } + + @Override + public String getInstanceId() { + return adaptee.getInstanceId(); + } + + @Override + public StringBuilder getConditionClause() { + StringBuilder sb = new StringBuilder(); + boolean appendConjunction = false; + + if (getMetricNames() != null) { + for (String name : getMetricNames()) { + if (sb.length() > 1) { + sb.append(" OR "); + } + sb.append("UUID = ?"); + } + + appendConjunction = true; + } + + appendConjunction = DefaultCondition.append(sb, appendConjunction, + getStartTime(), " SERVER_TIME >= ?"); + DefaultCondition.append(sb, appendConjunction, getEndTime(), + " SERVER_TIME < ?"); + + return sb; + } + + @Override + public String getOrderByClause(boolean asc) { + return adaptee.getOrderByClause(asc); + } + + @Override + public String getStatement() { + return adaptee.getStatement(); + } + + @Override + public Long getStartTime() { + return adaptee.getStartTime(); + } + + @Override + public Long getEndTime() { + return adaptee.getEndTime(); + } + + @Override + public Integer getLimit() { + return adaptee.getLimit(); + } + + @Override + public Integer getFetchSize() { + return adaptee.getFetchSize(); + } + + @Override + public void setFetchSize(Integer fetchSize) { + adaptee.setFetchSize(fetchSize); + } + + @Override + public void addOrderByColumn(String column) { + adaptee.addOrderByColumn(column); + } + + @Override + public void setNoLimit() { + adaptee.setNoLimit(); + } + + @Override + public boolean doUpdate() { + return false; + } + + public List<String> getOriginalMetricNames() { + return adaptee.getMetricNames(); + } + + public void setCurrentUuid(byte[] uuid) { + this.currentUuid = uuid; + } + + public byte[] getCurrentUuid() { + return currentUuid; + } + + @Override + public void setMetricNamesNotCondition(boolean metricNamesNotCondition) { + this.metricNamesNotCondition = metricNamesNotCondition; + } + + @Override + public void setHostnamesNotCondition(boolean hostNamesNotCondition) { + + } + + @Override + public void setUuidNotCondition(boolean uuidNotCondition) { + + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/TopNCondition.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/TopNCondition.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/TopNCondition.java new file mode 100644 index 0000000..e951cd5 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/TopNCondition.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.query; + +import java.util.List; + +import org.apache.ambari.metrics.core.timeline.aggregators.Function; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.Precision; + +public class TopNCondition extends DefaultCondition{ + + private Integer topN; + private boolean isBottomN; + private Function topNFunction; + private static final Log LOG = LogFactory.getLog(TopNCondition.class); + + public TopNCondition(List<byte[]> uuids, List<String> metricNames, List<String> hostnames, String appId, + String instanceId, Long startTime, Long endTime, Precision precision, + Integer limit, boolean grouped, Integer topN, Function topNFunction, + boolean isBottomN) { + super(uuids, metricNames, hostnames, appId, instanceId, startTime, endTime, precision, limit, grouped); + this.topN = topN; + this.isBottomN = isBottomN; + this.topNFunction = topNFunction; + } + + @Override + public StringBuilder getConditionClause() { + + + if (!(isTopNHostCondition(metricNames, hostnames) || isTopNMetricCondition(metricNames, hostnames))) { + LOG.error("Unsupported TopN Operation requested. Query can have either multiple hosts or multiple metric names " + + "but not both."); + return null; + } + + StringBuilder sb = new StringBuilder(); + sb.append(" UUID IN ("); + sb.append(getTopNInnerQuery()); + sb.append(")"); + + boolean appendConjunction = true; + appendConjunction = append(sb, appendConjunction, getStartTime(), " SERVER_TIME >= ?"); + append(sb, appendConjunction, getEndTime(), " SERVER_TIME < ?"); + + return sb; + } + + public String getTopNInnerQuery() { + return String.format(PhoenixTransactSQL.TOP_N_INNER_SQL, + PhoenixTransactSQL.getTargetTableUsingPrecision(precision, CollectionUtils.isNotEmpty(hostnames)), + super.getConditionClause().toString(), getTopNOrderByClause(), topN); + } + + private String getTopNOrderByClause() { + + String orderByClause = getColumnSelect(this.topNFunction); + orderByClause += (isBottomN ? " ASC" : " DESC"); + return orderByClause; + } + + public static String getColumnSelect(Function topNFunction) { + String columnSelect = null; + if (topNFunction != null) { + switch (topNFunction.getReadFunction()) { + case AVG: + columnSelect = "ROUND(AVG(METRIC_SUM),2)"; + break; + case SUM: + columnSelect = "SUM(METRIC_SUM)"; + break; + default: + columnSelect = "MAX(METRIC_MAX)"; + break; + } + } + if (columnSelect == null) { + columnSelect = "MAX(METRIC_MAX)"; + } + return columnSelect; + } + + public boolean isTopNHostCondition() { + return isTopNHostCondition(metricNames, hostnames); + } + + public boolean isTopNMetricCondition() { + return isTopNMetricCondition(metricNames, hostnames); + } + + /** + * Check if this is a case of Top N hosts condition + * @param metricNames A list of Strings. + * @param hostnames A list of Strings. + * @return True if it is a Case of Top N Hosts (1 Metric and H hosts). + */ + public static boolean isTopNHostCondition(List<String> metricNames, List<String> hostnames) { + // Case 1 : 1 Metric, H hosts + // Select Top N or Bottom N host series based on 1 metric (max/avg/sum) + // Hostnames cannot be empty + // Only 1 metric allowed, without wildcards + return (CollectionUtils.isNotEmpty(hostnames) && metricNames.size() == 1 && !metricNamesHaveWildcard(metricNames)); + + } + + /** + * Check if this is a case of Top N metrics condition + * @param metricNames A list of Strings. + * @param hostnames A list of Strings. + * @return True if it is a Case of Top N Metrics (M Metric and 1 or 0 host). + */ + public static boolean isTopNMetricCondition(List<String> metricNames, List<String> hostnames) { + // Case 2 : M Metric names or Regex, 1 or No host + // Select Top N or Bottom N metric series based on metric values(max/avg/sum) + // MetricNames cannot be empty + // No host (aggregate) or 1 host allowed, without wildcards + return (CollectionUtils.isNotEmpty(metricNames) && (hostnames == null || hostnames.size() <= 1) && + !hostNamesHaveWildcard(hostnames)); + } + + public Integer getTopN() { + return topN; + } + + public void setTopN(Integer topN) { + this.topN = topN; + } + + public boolean isBottomN() { + return isBottomN; + } + + public void setIsBottomN(boolean isBottomN) { + this.isBottomN = isBottomN; + } + + public Function getTopNFunction() { + return topNFunction; + } + + public void setTopNFunction(Function topNFunction) { + this.topNFunction = topNFunction; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/DefaultFSSinkProvider.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/DefaultFSSinkProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/DefaultFSSinkProvider.java new file mode 100644 index 0000000..52abc1e --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/DefaultFSSinkProvider.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.sink; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.Date; + +import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration; +import org.apache.ambari.metrics.core.timeline.source.InternalSourceProvider; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; + +public class DefaultFSSinkProvider implements ExternalSinkProvider { + private static final Log LOG = LogFactory.getLog(DefaultFSSinkProvider.class); + TimelineMetricConfiguration conf = TimelineMetricConfiguration.getInstance(); + private final DefaultExternalMetricsSink sink = new DefaultExternalMetricsSink(); + private long FIXED_FILE_SIZE; + private final String SINK_FILE_NAME = "external-metrics-sink.dat"; + private final String SEPARATOR = ", "; + private final String LINE_SEP = System.lineSeparator(); + private final String HEADERS = "METRIC, APP_ID, INSTANCE_ID, HOSTNAME, START_TIME, DATA"; + + public DefaultFSSinkProvider() { + try { + FIXED_FILE_SIZE = conf.getMetricsConf().getLong("timeline.metrics.service.external.fs.sink.filesize", FileUtils.ONE_MB * 100); + } catch (Exception ignored) { + FIXED_FILE_SIZE = FileUtils.ONE_MB * 100; + } + } + + @Override + public ExternalMetricsSink getExternalMetricsSink(InternalSourceProvider.SOURCE_NAME sourceName) { + return sink; + } + + class DefaultExternalMetricsSink implements ExternalMetricsSink { + + @Override + public int getSinkTimeOutSeconds() { + return 10; + } + + @Override + public int getFlushSeconds() { + try { + return conf.getMetricsConf().getInt(TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, 3); + } catch (Exception e) { + LOG.warn("Cannot read cache commit interval."); + } + return 3; + } + + private boolean createFile(File f) { + boolean created = false; + if (!f.exists()) { + try { + created = f.createNewFile(); + FileUtils.writeStringToFile(f, HEADERS); + } catch (IOException e) { + LOG.error("Cannot create " + SINK_FILE_NAME + " at " + f.getPath()); + return false; + } + } + + return created; + } + + private boolean shouldReCreate(File f) { + if (!f.exists()) { + return true; + } + if (FileUtils.sizeOf(f) > FIXED_FILE_SIZE) { + return true; + } + return false; + } + + @Override + public void sinkMetricData(Collection<TimelineMetrics> metrics) { + String dirPath = TimelineMetricConfiguration.getInstance().getDefaultMetricsSinkDir(); + File dir = new File(dirPath); + if (!dir.exists()) { + LOG.error("Cannot sink data to file system, incorrect dir path " + dirPath); + return; + } + + File f = FileUtils.getFile(dirPath, SINK_FILE_NAME); + if (shouldReCreate(f)) { + if (!f.delete()) { + LOG.warn("Unable to delete external sink file."); + return; + } + createFile(f); + } + + if (metrics != null) { + for (TimelineMetrics timelineMetrics : metrics) { + for (TimelineMetric metric : timelineMetrics.getMetrics()) { + StringBuilder sb = new StringBuilder(); + sb.append(metric.getMetricName()); + sb.append(SEPARATOR); + sb.append(metric.getAppId()); + sb.append(SEPARATOR); + if (StringUtils.isEmpty(metric.getInstanceId())) { + sb.append(SEPARATOR); + } else { + sb.append(metric.getInstanceId()); + sb.append(SEPARATOR); + } + if (StringUtils.isEmpty(metric.getHostName())) { + sb.append(SEPARATOR); + } else { + sb.append(metric.getHostName()); + sb.append(SEPARATOR); + } + sb.append(new Date(metric.getStartTime())); + sb.append(SEPARATOR); + sb.append(metric.getMetricValues().toString()); + sb.append(LINE_SEP); + try { + FileUtils.writeStringToFile(f, sb.toString()); + } catch (IOException e) { + LOG.warn("Unable to sink data to file " + f.getPath()); + } + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/ExternalMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/ExternalMetricsSink.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/ExternalMetricsSink.java new file mode 100644 index 0000000..b2bb908 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/ExternalMetricsSink.java @@ -0,0 +1,48 @@ +package org.apache.ambari.metrics.core.timeline.sink; + +import java.util.Collection; + +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +public interface ExternalMetricsSink { + /** + * How many seconds to wait on sink before dropping metrics. + * Note: Care should be taken that this timeout does not bottleneck the + * sink thread. + */ + int getSinkTimeOutSeconds(); + + /** + * How frequently to flush data to external system. + * Default would be between 60 - 120 seconds, coherent with default sink + * interval of AMS. + */ + int getFlushSeconds(); + + /** + * Raw data stream to process / store on external system. + * The data will be held in an in-memory cache and flushed at flush seconds + * or when the cache size limit is exceeded we will flush the cache and + * drop data if write fails. + * + * @param metrics {@link Collection<TimelineMetrics>} + */ + void sinkMetricData(Collection<TimelineMetrics> metrics); +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/ExternalSinkProvider.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/ExternalSinkProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/ExternalSinkProvider.java new file mode 100644 index 0000000..83cdc33 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/ExternalSinkProvider.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.core.timeline.sink; + +import org.apache.ambari.metrics.core.timeline.source.InternalSourceProvider; + + +/** + * Configurable provider for sink classes that match the metrics sources. + * Provider can return same sink or different sinks for each source. + */ +public interface ExternalSinkProvider { + + /** + * Return an instance of the metrics sink for the give source + * @return {@link ExternalMetricsSink} + */ + ExternalMetricsSink getExternalMetricsSink(InternalSourceProvider.SOURCE_NAME sourceName); +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/HttpSinkProvider.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/HttpSinkProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/HttpSinkProvider.java new file mode 100644 index 0000000..cbb0834 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/HttpSinkProvider.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.sink; + +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.security.KeyStore; +import java.util.Collection; + +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSocketFactory; +import javax.net.ssl.TrustManagerFactory; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration; +import org.apache.ambari.metrics.core.timeline.source.InternalSourceProvider; +import org.apache.http.client.utils.URIBuilder; +import org.codehaus.jackson.map.AnnotationIntrospector; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.annotate.JsonSerialize; +import org.codehaus.jackson.xc.JaxbAnnotationIntrospector; + +public class HttpSinkProvider implements ExternalSinkProvider { + private static final Log LOG = LogFactory.getLog(HttpSinkProvider.class); + TimelineMetricConfiguration conf = TimelineMetricConfiguration.getInstance(); + + private String connectUrl; + private SSLSocketFactory sslSocketFactory; + protected static ObjectMapper mapper; + + static { + mapper = new ObjectMapper(); + AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(); + mapper.setAnnotationIntrospector(introspector); + mapper.getSerializationConfig() + .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL); + } + + public HttpSinkProvider() { + Configuration config; + try { + config = conf.getMetricsConf(); + } catch (Exception e) { + throw new ExceptionInInitializerError("Unable to read configuration for sink."); + } + String protocol = config.get("timeline.metrics.service.external.http.sink.protocol", "http"); + String host = config.get("timeline.metrics.service.external.http.sink.host", "localhost"); + String port = config.get("timeline.metrics.service.external.http.sink.port", "6189"); + + if (protocol.contains("https")) { + loadTruststore( + config.getTrimmed("timeline.metrics.service.external.http.sink.truststore.path"), + config.getTrimmed("timeline.metrics.service.external.http.sink.truststore.type"), + config.getTrimmed("timeline.metrics.service.external.http.sink.truststore.password") + ); + } + + URIBuilder uriBuilder = new URIBuilder(); + uriBuilder.setScheme(protocol); + uriBuilder.setHost(host); + uriBuilder.setPort(Integer.parseInt(port)); + connectUrl = uriBuilder.toString(); + } + + @Override + public ExternalMetricsSink getExternalMetricsSink(InternalSourceProvider.SOURCE_NAME sourceName) { + return new DefaultHttpMetricsSink(); + } + + protected HttpURLConnection getConnection(String spec) throws IOException { + return (HttpURLConnection) new URL(spec).openConnection(); + } + + // Get an ssl connection + protected HttpsURLConnection getSSLConnection(String spec) + throws IOException, IllegalStateException { + + HttpsURLConnection connection = (HttpsURLConnection) (new URL(spec).openConnection()); + connection.setSSLSocketFactory(sslSocketFactory); + return connection; + } + + protected void loadTruststore(String trustStorePath, String trustStoreType, + String trustStorePassword) { + if (sslSocketFactory == null) { + if (trustStorePath == null || trustStorePassword == null) { + String msg = "Can't load TrustStore. Truststore path or password is not set."; + LOG.error(msg); + throw new IllegalStateException(msg); + } + FileInputStream in = null; + try { + in = new FileInputStream(new File(trustStorePath)); + KeyStore store = KeyStore.getInstance(trustStoreType == null ? + KeyStore.getDefaultType() : trustStoreType); + store.load(in, trustStorePassword.toCharArray()); + TrustManagerFactory tmf = TrustManagerFactory + .getInstance(TrustManagerFactory.getDefaultAlgorithm()); + tmf.init(store); + SSLContext context = SSLContext.getInstance("TLS"); + context.init(null, tmf.getTrustManagers(), null); + sslSocketFactory = context.getSocketFactory(); + } catch (Exception e) { + LOG.error("Unable to load TrustStore", e); + } finally { + if (in != null) { + try { + in.close(); + } catch (IOException e) { + LOG.error("Unable to load TrustStore", e); + } + } + } + } + } + + class DefaultHttpMetricsSink implements ExternalMetricsSink { + + @Override + public int getSinkTimeOutSeconds() { + try { + return conf.getMetricsConf().getInt("timeline.metrics.external.sink.http.timeout.seconds", 10); + } catch (Exception e) { + return 10; + } + } + + @Override + public int getFlushSeconds() { + try { + return conf.getMetricsConf().getInt(TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, 3); + } catch (Exception e) { + LOG.warn("Cannot read cache commit interval."); + } + return 3; + } + + /** + * Cleans up and closes an input stream + * see http://docs.oracle.com/javase/6/docs/technotes/guides/net/http-keepalive.html + * @param is the InputStream to clean up + * @return string read from the InputStream + * @throws IOException + */ + protected String cleanupInputStream(InputStream is) throws IOException { + StringBuilder sb = new StringBuilder(); + if (is != null) { + try ( + InputStreamReader isr = new InputStreamReader(is); + BufferedReader br = new BufferedReader(isr) + ) { + // read the response body + String line; + while ((line = br.readLine()) != null) { + if (LOG.isDebugEnabled()) { + sb.append(line); + } + } + } finally { + is.close(); + } + } + return sb.toString(); + } + + @Override + public void sinkMetricData(Collection<TimelineMetrics> metrics) { + HttpURLConnection connection = null; + try { + connection = connectUrl.startsWith("https") ? getSSLConnection(connectUrl) : getConnection(connectUrl); + + connection.setRequestMethod("POST"); + connection.setRequestProperty("Content-Type", "application/json"); + connection.setRequestProperty("Connection", "Keep-Alive"); + connection.setConnectTimeout(getSinkTimeOutSeconds()); + connection.setReadTimeout(getSinkTimeOutSeconds()); + connection.setDoOutput(true); + + if (metrics != null) { + String jsonData = mapper.writeValueAsString(metrics); + try (OutputStream os = connection.getOutputStream()) { + os.write(jsonData.getBytes("UTF-8")); + } + } + + int statusCode = connection.getResponseCode(); + + if (statusCode != 200) { + LOG.info("Unable to POST metrics to external sink, " + connectUrl + + ", statusCode = " + statusCode); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Metrics posted to external sink " + connectUrl); + } + } + cleanupInputStream(connection.getInputStream()); + + } catch (IOException io) { + LOG.warn("Unable to sink data to external system.", io); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/KafkaSinkProvider.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/KafkaSinkProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/KafkaSinkProvider.java new file mode 100644 index 0000000..1ce624b --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/KafkaSinkProvider.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.sink; + +import java.util.Collection; +import java.util.Properties; +import java.util.concurrent.Future; + +import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration; +import org.apache.ambari.metrics.core.timeline.source.InternalSourceProvider; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +/* + This will be used by the single Metrics committer thread. Hence it is + important to make this non-blocking export. + */ +public class KafkaSinkProvider implements ExternalSinkProvider { + private static String TOPIC_NAME = "ambari-metrics-topic"; + private static final Log LOG = LogFactory.getLog(KafkaSinkProvider.class); + + private Producer producer; + private int TIMEOUT_SECONDS = 10; + private int FLUSH_SECONDS = 3; + + ObjectMapper objectMapper = new ObjectMapper(); + + public KafkaSinkProvider() { + TimelineMetricConfiguration configuration = TimelineMetricConfiguration.getInstance(); + + Properties configProperties = new Properties(); + try { + configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, configuration.getMetricsConf().getTrimmed(TimelineMetricConfiguration.KAFKA_SERVERS)); + configProperties.put(ProducerConfig.ACKS_CONFIG, configuration.getMetricsConf().getTrimmed(TimelineMetricConfiguration.KAFKA_ACKS, "all")); + // Avoid duplicates - No transactional semantics + configProperties.put(ProducerConfig.RETRIES_CONFIG, configuration.getMetricsConf().getInt(TimelineMetricConfiguration.KAFKA_RETRIES, 0)); + configProperties.put(ProducerConfig.BATCH_SIZE_CONFIG, configuration.getMetricsConf().getInt(TimelineMetricConfiguration.KAFKA_BATCH_SIZE, 128)); + configProperties.put(ProducerConfig.LINGER_MS_CONFIG, configuration.getMetricsConf().getInt(TimelineMetricConfiguration.KAFKA_LINGER_MS, 1)); + configProperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, configuration.getMetricsConf().getLong(TimelineMetricConfiguration.KAFKA_BUFFER_MEM, 33554432)); // 32 MB + FLUSH_SECONDS = configuration.getMetricsConf().getInt(TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, 3); + TIMEOUT_SECONDS = configuration.getMetricsConf().getInt(TimelineMetricConfiguration.KAFKA_SINK_TIMEOUT_SECONDS, 10); + } catch (Exception e) { + LOG.error("Configuration error!", e); + throw new ExceptionInInitializerError(e); + } + configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer"); + configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.connect.json.JsonSerializer"); + + + + producer = new KafkaProducer(configProperties); + } + + @Override + public ExternalMetricsSink getExternalMetricsSink(InternalSourceProvider.SOURCE_NAME sourceName) { + switch (sourceName) { + case RAW_METRICS: + return new KafkaRawMetricsSink(); + default: + throw new UnsupportedOperationException("Provider does not support " + + "the expected source " + sourceName); + } + } + + class KafkaRawMetricsSink implements ExternalMetricsSink { + + @Override + public int getSinkTimeOutSeconds() { + return TIMEOUT_SECONDS; + } + + @Override + public int getFlushSeconds() { + return FLUSH_SECONDS; + } + + @Override + public void sinkMetricData(Collection<TimelineMetrics> metrics) { + JsonNode jsonNode = objectMapper.valueToTree(metrics); + ProducerRecord<String, JsonNode> rec = new ProducerRecord<String, JsonNode>(TOPIC_NAME, jsonNode); + Future<RecordMetadata> f = producer.send(rec); + } + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/DefaultInternalMetricsSourceProvider.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/DefaultInternalMetricsSourceProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/DefaultInternalMetricsSourceProvider.java new file mode 100644 index 0000000..1287328 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/DefaultInternalMetricsSourceProvider.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.source; + +import org.apache.ambari.metrics.core.timeline.sink.ExternalMetricsSink; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class DefaultInternalMetricsSourceProvider implements InternalSourceProvider { + private static final Log LOG = LogFactory.getLog(DefaultInternalMetricsSourceProvider.class); + + // TODO: Implement read based sources for higher order data + @Override + public InternalMetricsSource getInternalMetricsSource(SOURCE_NAME sourceName, int sinkIntervalSeconds, ExternalMetricsSink sink) { + if (sink == null) { + LOG.warn("No external sink configured for source " + sourceName); + return null; + } + + switch (sourceName) { + case RAW_METRICS: + return new RawMetricsSource(sinkIntervalSeconds, sink); + default: + throw new UnsupportedOperationException("Unimplemented source type " + sourceName); + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/InternalMetricsSource.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/InternalMetricsSource.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/InternalMetricsSource.java new file mode 100644 index 0000000..5aaf128 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/InternalMetricsSource.java @@ -0,0 +1,30 @@ +package org.apache.ambari.metrics.core.timeline.source; + +import java.util.Collection; + +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +public interface InternalMetricsSource { + /** + * Write metrics to external sink. + * Allows pre-processing and caching capabilities to the consumer. + */ + void publishTimelineMetrics(Collection<TimelineMetrics> metrics); +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/InternalSourceProvider.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/InternalSourceProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/InternalSourceProvider.java new file mode 100644 index 0000000..3bd8e42 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/InternalSourceProvider.java @@ -0,0 +1,39 @@ +package org.apache.ambari.metrics.core.timeline.source; + +import org.apache.ambari.metrics.core.timeline.sink.ExternalMetricsSink; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +public interface InternalSourceProvider { + + enum SOURCE_NAME { + RAW_METRICS, + MINUTE_HOST_AGGREAGATE_METRICS, + HOURLY_HOST_AGGREAGATE_METRICS, + DAILY_HOST_AGGREAGATE_METRICS, + MINUTE_CLUSTER_AGGREAGATE_METRICS, + HOURLY_CLUSTER_AGGREAGATE_METRICS, + DAILY_CLUSTER_AGGREAGATE_METRICS, + } + + /** + * Provide Source for metrics data. + * @return {@link InternalMetricsSource} + */ + InternalMetricsSource getInternalMetricsSource(SOURCE_NAME sourceName, int sinkIntervalSeconds, ExternalMetricsSink sink); +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/RawMetricsSource.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/RawMetricsSource.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/RawMetricsSource.java new file mode 100644 index 0000000..9dddb33 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/RawMetricsSource.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.source; + +import java.util.Collection; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.ambari.metrics.core.timeline.sink.ExternalMetricsSink; +import org.apache.ambari.metrics.core.timeline.source.cache.InternalMetricsCacheProvider; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.ambari.metrics.core.timeline.source.cache.InternalMetricsCache; + +public class RawMetricsSource implements InternalMetricsSource { + private static final Log LOG = LogFactory.getLog(RawMetricsSource.class); + private final int internalCacheInterval; + private final ExternalMetricsSink rawMetricsSink; + private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + private final InternalMetricsCache cache; + static final String RAW_METRICS_CACHE = "RAW_METRICS_CACHE_INSTANCE"; + + public RawMetricsSource(int internalCacheInterval, ExternalMetricsSink rawMetricsSink) { + this.internalCacheInterval = internalCacheInterval; + this.rawMetricsSink = rawMetricsSink; + this.cache = InternalMetricsCacheProvider.getInstance().getCacheInstance(RAW_METRICS_CACHE); + if (rawMetricsSink.getFlushSeconds() > internalCacheInterval) { + initializeFixedRateScheduler(); + } + } + + @Override + public void publishTimelineMetrics(Collection<TimelineMetrics> metrics) { + // TODO: Adjust default flush to reasonable defaults > 3 seconds + if (rawMetricsSink.getFlushSeconds() > internalCacheInterval) { + // Need to cache only if external sink cannot keep up and thereby has + // different flush interval as compared to HBase flush + cache.putAll(metrics); // Scheduler initialized already for flush + } else { + submitDataWithTimeout(metrics); + } + } + + private void initializeFixedRateScheduler() { + executorService.scheduleAtFixedRate(() -> rawMetricsSink.sinkMetricData(cache.evictAll()), + rawMetricsSink.getFlushSeconds(), rawMetricsSink.getFlushSeconds(), TimeUnit.SECONDS); + } + + private void submitDataWithTimeout(final Collection<TimelineMetrics> metrics) { + Future f = executorService.submit(() -> { + rawMetricsSink.sinkMetricData(metrics); + return null; + }); + try { + f.get(rawMetricsSink.getSinkTimeOutSeconds(), TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.warn("Raw metrics sink interrupted."); + } catch (ExecutionException e) { + LOG.warn("Exception on sinking metrics", e); + } catch (TimeoutException e) { + LOG.warn("Timeout exception on sinking metrics", e); + } + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/cache/InternalMetricCacheKey.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/cache/InternalMetricCacheKey.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/cache/InternalMetricCacheKey.java new file mode 100644 index 0000000..bfb610f --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/cache/InternalMetricCacheKey.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.source.cache; + +public class InternalMetricCacheKey { + private String metricName; + private String appId; + private String instanceId; + private String hostname; + private long startTime; // Useful for debugging + + public InternalMetricCacheKey(String metricName, String appId, String instanceId, String hostname, long startTime) { + this.metricName = metricName; + this.appId = appId; + this.instanceId = instanceId; + this.hostname = hostname; + this.startTime = startTime; + } + + public String getMetricName() { + return metricName; + } + + public void setMetricName(String metricName) { + this.metricName = metricName; + } + + public String getAppId() { + return appId; + } + + public void setAppId(String appId) { + this.appId = appId; + } + + public String getInstanceId() { + return instanceId; + } + + public void setInstanceId(String instanceId) { + this.instanceId = instanceId; + } + + public String getHostname() { + return hostname; + } + + public void setHostname(String hostname) { + this.hostname = hostname; + } + + public long getStartTime() { + return startTime; + } + + public void setStartTime(long startTime) { + this.startTime = startTime; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + InternalMetricCacheKey that = (InternalMetricCacheKey) o; + + if (!getMetricName().equals(that.getMetricName())) return false; + if (!getAppId().equals(that.getAppId())) return false; + if (getInstanceId() != null ? !getInstanceId().equals(that.getInstanceId()) : that.getInstanceId() != null) + return false; + return getHostname() != null ? getHostname().equals(that.getHostname()) : that.getHostname() == null; + + } + + @Override + public int hashCode() { + int result = getMetricName().hashCode(); + result = 31 * result + getAppId().hashCode(); + result = 31 * result + (getInstanceId() != null ? getInstanceId().hashCode() : 0); + result = 31 * result + (getHostname() != null ? getHostname().hashCode() : 0); + return result; + } + + @Override + public String toString() { + return "InternalMetricCacheKey{" + + "metricName='" + metricName + '\'' + + ", appId='" + appId + '\'' + + ", instanceId='" + instanceId + '\'' + + ", hostname='" + hostname + '\'' + + ", startTime=" + startTime + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/cache/InternalMetricCacheValue.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/cache/InternalMetricCacheValue.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/cache/InternalMetricCacheValue.java new file mode 100644 index 0000000..6573c4e --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/cache/InternalMetricCacheValue.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.source.cache; + +import java.util.Map; +import java.util.TreeMap; + +public class InternalMetricCacheValue { + private TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>(); + + public TreeMap<Long, Double> getMetricValues() { + return metricValues; + } + + public void setMetricValues(TreeMap<Long, Double> metricValues) { + this.metricValues = metricValues; + } + + public void addMetricValues(Map<Long, Double> metricValues) { + this.metricValues.putAll(metricValues); + } +}
