http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java deleted file mode 100644 index 24239a0..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query; - - -import java.sql.Connection; -import java.sql.SQLException; - -/** - * - */ -public interface ConnectionProvider { - public Connection getConnection() throws SQLException; -}
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultCondition.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultCondition.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultCondition.java deleted file mode 100644 index a88f44e..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultCondition.java +++ /dev/null @@ -1,421 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query; - -import java.util.ArrayList; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Set; - -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.metrics2.sink.timeline.Precision; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; - -public class DefaultCondition implements Condition { - List<String> metricNames; - List<String> hostnames; - String appId; - String instanceId; - Long startTime; - Long endTime; - Precision precision; - Integer limit; - boolean grouped; - boolean noLimit = false; - Integer fetchSize; - String statement; - Set<String> orderByColumns = new LinkedHashSet<String>(); - boolean metricNamesNotCondition = false; - boolean hostNamesNotCondition = false; - boolean uuidNotCondition = false; - List<byte[]> uuids = new ArrayList<>(); - - private static final Log LOG = LogFactory.getLog(DefaultCondition.class); - - public DefaultCondition(List<String> metricNames, List<String> hostnames, String appId, - String instanceId, Long startTime, Long endTime, Precision precision, - Integer limit, boolean grouped) { - this.metricNames = metricNames; - this.hostnames = hostnames; - this.appId = appId; - this.instanceId = instanceId; - this.startTime = startTime; - this.endTime = endTime; - this.precision = precision; - this.limit = limit; - this.grouped = grouped; - } - - public DefaultCondition(List<byte[]> uuids, List<String> metricNames, List<String> hostnames, String appId, - String instanceId, Long startTime, Long endTime, Precision precision, - Integer limit, boolean grouped) { - this.uuids = uuids; - this.metricNames = metricNames; - this.hostnames = hostnames; - this.appId = appId; - this.instanceId = instanceId; - this.startTime = startTime; - this.endTime = endTime; - this.precision = precision; - this.limit = limit; - this.grouped = grouped; - } - - public String getStatement() { - return statement; - } - - public void setStatement(String statement) { - this.statement = statement; - } - - public List<String> getMetricNames() { - return metricNames == null || metricNames.isEmpty() ? null : metricNames; - } - - public StringBuilder getConditionClause() { - StringBuilder sb = new StringBuilder(); - boolean appendConjunction = appendUuidClause(sb); - appendConjunction = append(sb, appendConjunction, getStartTime(), " SERVER_TIME >= ?"); - append(sb, appendConjunction, getEndTime(), " SERVER_TIME < ?"); - - return sb; - } - - protected static boolean append(StringBuilder sb, - boolean appendConjunction, - Object value, String str) { - if (value != null) { - if (appendConjunction) { - sb.append(" AND"); - } - - sb.append(str); - appendConjunction = true; - } - return appendConjunction; - } - - public List<String> getHostnames() { - return hostnames; - } - - public Precision getPrecision() { - return precision; - } - - public void setPrecision(Precision precision) { - this.precision = precision; - } - - public String getAppId() { - if (appId != null && !appId.isEmpty()) { - if (!(appId.equals("HOST") || appId.equals("FLUME_HANDLER"))) { - return appId.toLowerCase(); - } else { - return appId; - } - } - return null; - } - - public String getInstanceId() { - return instanceId == null || instanceId.isEmpty() ? null : instanceId; - } - - /** - * Convert to millis. - */ - public Long getStartTime() { - if (startTime == null) { - return null; - } else if (startTime < 9999999999l) { - return startTime * 1000; - } else { - return startTime; - } - } - - public Long getEndTime() { - if (endTime == null) { - return null; - } - if (endTime < 9999999999l) { - return endTime * 1000; - } else { - return endTime; - } - } - - public void setNoLimit() { - this.noLimit = true; - } - - @Override - public boolean doUpdate() { - return false; - } - - public Integer getLimit() { - if (noLimit) { - return null; - } - return limit == null ? PhoenixHBaseAccessor.RESULTSET_LIMIT : limit; - } - - public boolean isGrouped() { - return grouped; - } - - public boolean isPointInTime() { - return getStartTime() == null && getEndTime() == null; - } - - public boolean isEmpty() { - return (metricNames == null || metricNames.isEmpty()) - && (hostnames == null || hostnames.isEmpty()) - && (appId == null || appId.isEmpty()) - && (instanceId == null || instanceId.isEmpty()) - && startTime == null - && endTime == null; - } - - public Integer getFetchSize() { - return fetchSize; - } - - public void setFetchSize(Integer fetchSize) { - this.fetchSize = fetchSize; - } - - public void addOrderByColumn(String column) { - orderByColumns.add(column); - } - - public String getOrderByClause(boolean asc) { - String orderByStr = " ORDER BY "; - if (!orderByColumns.isEmpty()) { - StringBuilder sb = new StringBuilder(orderByStr); - for (String orderByColumn : orderByColumns) { - if (sb.length() != orderByStr.length()) { - sb.append(", "); - } - sb.append(orderByColumn); - if (!asc) { - sb.append(" DESC"); - } - } - sb.append(" "); - return sb.toString(); - } - return null; - } - - protected boolean appendUuidClause(StringBuilder sb) { - boolean appendConjunction = false; - - if (CollectionUtils.isNotEmpty(uuids)) { - - List<byte[]> uuidsHost = new ArrayList<>(); - List<byte[]> uuidsMetric = new ArrayList<>(); - List<byte[]> uuidsFull = new ArrayList<>(); - - if (getUuids() != null) { - for (byte[] uuid : uuids) { - if (uuid.length == TimelineMetricMetadataManager.TIMELINE_METRIC_UUID_LENGTH) { - uuidsMetric.add(uuid); - } else if (uuid.length == TimelineMetricMetadataManager.HOSTNAME_UUID_LENGTH) { - uuidsHost.add(uuid); - } else { - uuidsFull.add(uuid); - } - } - - // Put a '(' first - sb.append("("); - - //IN clause - // METRIC_NAME (NOT) IN (?,?,?,?) - if (CollectionUtils.isNotEmpty(uuidsFull)) { - sb.append("UUID"); - if (uuidNotCondition) { - sb.append(" NOT"); - } - sb.append(" IN ("); - //Append ?,?,?,? - for (int i = 0; i < uuidsFull.size(); i++) { - sb.append("?"); - if (i < uuidsFull.size() - 1) { - sb.append(", "); - } - } - sb.append(")"); - appendConjunction = true; - } - - //Put an AND if both types are present - if (CollectionUtils.isNotEmpty(uuidsFull) && - CollectionUtils.isNotEmpty(uuidsMetric)) { - sb.append(" AND "); - } - - // ( for OR - if (!metricNamesNotCondition && uuidsMetric.size() > 1 && (CollectionUtils.isNotEmpty(uuidsFull) || CollectionUtils.isNotEmpty(uuidsHost))) { - sb.append("("); - } - - //LIKE clause for clusterMetric UUIDs - // UUID (NOT) LIKE ? OR(AND) UUID LIKE ? - if (CollectionUtils.isNotEmpty(uuidsMetric)) { - - for (int i = 0; i < uuidsMetric.size(); i++) { - sb.append("UUID"); - if (metricNamesNotCondition) { - sb.append(" NOT"); - } - sb.append(" LIKE "); - sb.append("?"); - - if (i < uuidsMetric.size() - 1) { - if (metricNamesNotCondition) { - sb.append(" AND "); - } else { - sb.append(" OR "); - } - // ) for OR - } else if ((CollectionUtils.isNotEmpty(uuidsFull) || CollectionUtils.isNotEmpty(uuidsHost)) && !metricNamesNotCondition && uuidsMetric.size() > 1) { - sb.append(")"); - } - } - appendConjunction = true; - } - - //Put an AND if both types are present - if ((CollectionUtils.isNotEmpty(uuidsMetric) || (CollectionUtils.isNotEmpty(uuidsFull) && CollectionUtils.isEmpty(uuidsMetric))) - && CollectionUtils.isNotEmpty(uuidsHost)) { - sb.append(" AND "); - } - // ( for OR - if((CollectionUtils.isNotEmpty(uuidsFull) || CollectionUtils.isNotEmpty(uuidsMetric)) && !hostNamesNotCondition && uuidsHost.size() > 1){ - sb.append("("); - } - - //LIKE clause for HOST UUIDs - // UUID (NOT) LIKE ? OR(AND) UUID LIKE ? - if (CollectionUtils.isNotEmpty(uuidsHost)) { - - for (int i = 0; i < uuidsHost.size(); i++) { - sb.append("UUID"); - if (hostNamesNotCondition) { - sb.append(" NOT"); - } - sb.append(" LIKE "); - sb.append("?"); - - if (i < uuidsHost.size() - 1) { - if (hostNamesNotCondition) { - sb.append(" AND "); - } else { - sb.append(" OR "); - } - // ) for OR - } else if ((CollectionUtils.isNotEmpty(uuidsFull) || CollectionUtils.isNotEmpty(uuidsMetric)) && !hostNamesNotCondition && uuidsHost.size() > 1) { - sb.append(")"); - } - } - appendConjunction = true; - } - - // Finish with a ')' - if (appendConjunction) { - sb.append(")"); - } - - uuids = new ArrayList<>(); - if (CollectionUtils.isNotEmpty(uuidsFull)) { - uuids.addAll(uuidsFull); - } - for (byte[] uuid: uuidsMetric) { - uuids.add(new String(uuid).concat("%").getBytes()); - } - for (byte[] uuid: uuidsHost) { - uuids.add("%".concat(new String(uuid)).getBytes()); - } - } - } - - return appendConjunction; - } - - @Override - public String toString() { - return "Condition{" + - "uuids=" + uuids + - ", appId='" + appId + '\'' + - ", instanceId='" + instanceId + '\'' + - ", startTime=" + startTime + - ", endTime=" + endTime + - ", limit=" + limit + - ", grouped=" + grouped + - ", orderBy=" + orderByColumns + - ", noLimit=" + noLimit + - '}'; - } - - protected static boolean metricNamesHaveWildcard(List<String> metricNames) { - for (String name : metricNames) { - if (name.contains("%")) { - return true; - } - } - return false; - } - - protected static boolean hostNamesHaveWildcard(List<String> hostnames) { - if (hostnames == null) - return false; - for (String name : hostnames) { - if (name.contains("%")) { - return true; - } - } - return false; - } - - public void setMetricNamesNotCondition(boolean metricNamesNotCondition) { - this.metricNamesNotCondition = metricNamesNotCondition; - } - - @Override - public void setHostnamesNotCondition(boolean hostNamesNotCondition) { - this.hostNamesNotCondition = hostNamesNotCondition; - } - - @Override - public void setUuidNotCondition(boolean uuidNotCondition) { - this.uuidNotCondition = uuidNotCondition; - } - - @Override - public List<byte[]> getUuids() { - return uuids; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java deleted file mode 100644 index 78fad62..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query; - - -import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.HBaseAdmin; - -public class DefaultPhoenixDataSource implements PhoenixConnectionProvider { - - static final Log LOG = LogFactory.getLog(DefaultPhoenixDataSource.class); - private static final String ZOOKEEPER_CLIENT_PORT = "hbase.zookeeper.property.clientPort"; - private static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"; - private static final String ZNODE_PARENT = "zookeeper.znode.parent"; - - private static final String connectionUrl = "jdbc:phoenix:%s:%s:%s"; - private final String url; - - private Configuration hbaseConf; - - public DefaultPhoenixDataSource(Configuration hbaseConf) { - this.hbaseConf = hbaseConf; - String zookeeperClientPort = hbaseConf.getTrimmed(ZOOKEEPER_CLIENT_PORT, "2181"); - String zookeeperQuorum = hbaseConf.getTrimmed(ZOOKEEPER_QUORUM); - String znodeParent = hbaseConf.getTrimmed(ZNODE_PARENT, "/ams-hbase-unsecure"); - if (zookeeperQuorum == null || zookeeperQuorum.isEmpty()) { - throw new IllegalStateException("Unable to find Zookeeper quorum to " + - "access HBase store using Phoenix."); - } - - url = String.format(connectionUrl, - zookeeperQuorum, - zookeeperClientPort, - znodeParent); - } - - /** - * Get HBaseAdmin for table ops. - * @return @HBaseAdmin - * @throws IOException - */ - public HBaseAdmin getHBaseAdmin() throws IOException { - return (HBaseAdmin) ConnectionFactory.createConnection(hbaseConf).getAdmin(); - } - - /** - * Get JDBC connection to HBase store. Assumption is that the hbase - * configuration is present on the classpath and loaded by the caller into - * the Configuration object. - * Phoenix already caches the HConnection between the client and HBase - * cluster. - * - * @return @java.sql.Connection - */ - public Connection getConnection() throws SQLException { - - LOG.debug("Metric store connection url: " + url); - try { - return DriverManager.getConnection(url); - } catch (SQLException e) { - LOG.warn("Unable to connect to HBase store using Phoenix.", e); - - throw e; - } - } - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/EmptyCondition.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/EmptyCondition.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/EmptyCondition.java deleted file mode 100644 index 5d1e244..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/EmptyCondition.java +++ /dev/null @@ -1,169 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query; - -import java.util.List; - -import org.apache.commons.lang.NotImplementedException; -import org.apache.hadoop.metrics2.sink.timeline.Precision; - -/** - * Encapsulate a Condition with pre-formatted and pre-parsed query string. - */ -public class EmptyCondition implements Condition { - String statement; - boolean doUpdate = false; - boolean metricNamesNotCondition = false; - - @Override - public boolean isEmpty() { - return false; - } - - @Override - public List<byte[]> getUuids() { - return null; - } - - @Override - public List<String> getMetricNames() { - return null; - } - - @Override - public boolean isPointInTime() { - return false; - } - - @Override - public boolean isGrouped() { - return true; - } - - @Override - public void setStatement(String statement) { - this.statement = statement; - } - - @Override - public List<String> getHostnames() { - return null; - } - - @Override - public Precision getPrecision() { - return null; - } - - @Override - public void setPrecision(Precision precision) { - - } - - @Override - public String getAppId() { - return null; - } - - @Override - public String getInstanceId() { - return null; - } - - @Override - public StringBuilder getConditionClause() { - return null; - } - - @Override - public String getOrderByClause(boolean asc) { - return null; - } - - @Override - public String getStatement() { - return statement; - } - - @Override - public Long getStartTime() { - return null; - } - - @Override - public Long getEndTime() { - return null; - } - - @Override - public Integer getLimit() { - return null; - } - - @Override - public Integer getFetchSize() { - return null; - } - - @Override - public void setFetchSize(Integer fetchSize) { - - } - - @Override - public void addOrderByColumn(String column) { - - } - - @Override - public void setNoLimit() { - - } - - public void setDoUpdate(boolean doUpdate) { - this.doUpdate = doUpdate; - } - - @Override - public boolean doUpdate() { - return doUpdate; - } - - @Override - public String toString() { - return "EmptyCondition{ " + - " statement = " + this.getStatement() + - " doUpdate = " + this.doUpdate() + - " }"; - } - - @Override - public void setMetricNamesNotCondition(boolean metricNamesNotCondition) { - this.metricNamesNotCondition = metricNamesNotCondition; - } - - @Override - public void setHostnamesNotCondition(boolean hostNamesNotCondition) { - throw new NotImplementedException("Not implemented"); - } - - @Override - public void setUuidNotCondition(boolean uuidNotCondition) { - throw new NotImplementedException("Not implemented"); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixConnectionProvider.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixConnectionProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixConnectionProvider.java deleted file mode 100644 index cdb3b4e..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixConnectionProvider.java +++ /dev/null @@ -1,31 +0,0 @@ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query; - -import java.io.IOException; - -import org.apache.hadoop.hbase.client.Admin; - -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -public interface PhoenixConnectionProvider extends ConnectionProvider { - /** - * Get HBaseAdmin for the Phoenix connection - * @return - * @throws IOException - */ - Admin getHBaseAdmin() throws IOException; -} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java deleted file mode 100644 index 457f5af..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java +++ /dev/null @@ -1,804 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query; - -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.regex.Pattern; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.metrics2.sink.timeline.Precision; -import org.apache.hadoop.metrics2.sink.timeline.PrecisionLimitExceededException; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; - -/** - * Encapsulate all metrics related SQL queries. - */ -public class PhoenixTransactSQL { - - public static final Log LOG = LogFactory.getLog(PhoenixTransactSQL.class); - - /** - * Create table to store individual metric records. - */ - public static final String CREATE_METRICS_TABLE_SQL = "CREATE TABLE IF NOT " + - "EXISTS METRIC_RECORD (UUID BINARY(20) NOT NULL, " + - "SERVER_TIME BIGINT NOT NULL, " + - "METRIC_SUM DOUBLE, " + - "METRIC_COUNT UNSIGNED_INT, " + - "METRIC_MAX DOUBLE, " + - "METRIC_MIN DOUBLE, " + - "METRICS VARCHAR CONSTRAINT pk " + - "PRIMARY KEY (UUID, SERVER_TIME ROW_TIMESTAMP)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " + - "TTL=%s, COMPRESSION='%s'"; - - public static final String CREATE_CONTAINER_METRICS_TABLE_SQL = - "CREATE TABLE IF NOT EXISTS CONTAINER_METRICS " - + "(APP_ID VARCHAR, " - + " CONTAINER_ID VARCHAR," - + " START_TIME TIMESTAMP," - + " FINISH_TIME TIMESTAMP, " - + " DURATION BIGINT," - + " HOSTNAME VARCHAR," - + " EXIT_CODE INTEGER," - + " LOCALIZATION_DURATION BIGINT," - + " LAUNCH_DURATION BIGINT," - + " MEM_REQUESTED_GB DOUBLE," - + " MEM_REQUESTED_GB_MILLIS DOUBLE," - + " MEM_VIRTUAL_GB DOUBLE," - + " MEM_USED_GB_MIN DOUBLE," - + " MEM_USED_GB_MAX DOUBLE," - + " MEM_USED_GB_AVG DOUBLE," - + " MEM_USED_GB_50_PCT DOUBLE," - + " MEM_USED_GB_75_PCT DOUBLE," - + " MEM_USED_GB_90_PCT DOUBLE," - + " MEM_USED_GB_95_PCT DOUBLE," - + " MEM_USED_GB_99_PCT DOUBLE," - + " MEM_UNUSED_GB DOUBLE," - + " MEM_UNUSED_GB_MILLIS DOUBLE " - + " CONSTRAINT pk PRIMARY KEY(APP_ID, CONTAINER_ID)) DATA_BLOCK_ENCODING='%s'," - + " IMMUTABLE_ROWS=true, TTL=%s, COMPRESSION='%s'"; - - public static final String CREATE_METRICS_AGGREGATE_TABLE_SQL = - "CREATE TABLE IF NOT EXISTS %s " + - "(UUID BINARY(20) NOT NULL, " + - "SERVER_TIME BIGINT NOT NULL, " + - "METRIC_SUM DOUBLE," + - "METRIC_COUNT UNSIGNED_INT, " + - "METRIC_MAX DOUBLE," + - "METRIC_MIN DOUBLE CONSTRAINT pk " + - "PRIMARY KEY (UUID, SERVER_TIME ROW_TIMESTAMP)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, TTL=%s," + - " COMPRESSION='%s'"; - - public static final String CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL = - "CREATE TABLE IF NOT EXISTS %s " + - "(UUID BINARY(16) NOT NULL, " + - "SERVER_TIME BIGINT NOT NULL, " + - "METRIC_SUM DOUBLE, " + - "HOSTS_COUNT UNSIGNED_INT, " + - "METRIC_MAX DOUBLE, " + - "METRIC_MIN DOUBLE " + - "CONSTRAINT pk PRIMARY KEY (UUID, SERVER_TIME ROW_TIMESTAMP)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " + - "TTL=%s, COMPRESSION='%s'"; - - // HOSTS_COUNT vs METRIC_COUNT - public static final String CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL = - "CREATE TABLE IF NOT EXISTS %s " + - "(UUID BINARY(16) NOT NULL, " + - "SERVER_TIME BIGINT NOT NULL, " + - "METRIC_SUM DOUBLE, " + - "METRIC_COUNT UNSIGNED_INT, " + - "METRIC_MAX DOUBLE, " + - "METRIC_MIN DOUBLE " + - "CONSTRAINT pk PRIMARY KEY (UUID, SERVER_TIME ROW_TIMESTAMP)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " + - "TTL=%s, COMPRESSION='%s'"; - - public static final String CREATE_METRICS_METADATA_TABLE_SQL = - "CREATE TABLE IF NOT EXISTS METRICS_METADATA " + - "(METRIC_NAME VARCHAR, " + - "APP_ID VARCHAR, " + - "INSTANCE_ID VARCHAR, " + - "UUID BINARY(16), " + - "UNITS CHAR(20), " + - "TYPE CHAR(20), " + - "START_TIME UNSIGNED_LONG, " + - "SUPPORTS_AGGREGATION BOOLEAN, " + - "IS_WHITELISTED BOOLEAN " + - "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID)) " + - "DATA_BLOCK_ENCODING='%s', COMPRESSION='%s'"; - - public static final String CREATE_HOSTED_APPS_METADATA_TABLE_SQL = - "CREATE TABLE IF NOT EXISTS HOSTED_APPS_METADATA " + - "(HOSTNAME VARCHAR, UUID BINARY(4), APP_IDS VARCHAR, " + - "CONSTRAINT pk PRIMARY KEY (HOSTNAME))" + - "DATA_BLOCK_ENCODING='%s', COMPRESSION='%s'"; - - public static final String CREATE_INSTANCE_HOST_TABLE_SQL = - "CREATE TABLE IF NOT EXISTS INSTANCE_HOST_METADATA " + - "(INSTANCE_ID VARCHAR, HOSTNAME VARCHAR, " + - "CONSTRAINT pk PRIMARY KEY (INSTANCE_ID, HOSTNAME))" + - "DATA_BLOCK_ENCODING='%s', COMPRESSION='%s'"; - - public static final String ALTER_METRICS_METADATA_TABLE = - "ALTER TABLE METRICS_METADATA ADD IF NOT EXISTS IS_WHITELISTED BOOLEAN"; - - /** - * ALTER table to set new options - */ - public static final String ALTER_SQL = "ALTER TABLE %s SET TTL=%s"; - - /** - * Insert into metric records table. - */ - public static final String UPSERT_METRICS_SQL = "UPSERT INTO %s " + - "(UUID, " + - "SERVER_TIME, " + - "METRIC_SUM, " + - "METRIC_MAX, " + - "METRIC_MIN, " + - "METRIC_COUNT, " + - "METRICS) VALUES " + - "(?, ?, ?, ?, ?, ?, ?)"; - - public static final String UPSERT_CONTAINER_METRICS_SQL = "UPSERT INTO %s " + - "(APP_ID," - + " CONTAINER_ID," - + " START_TIME," - + " FINISH_TIME," - + " DURATION," - + " HOSTNAME," - + " EXIT_CODE," - + " LOCALIZATION_DURATION," - + " LAUNCH_DURATION," - + " MEM_REQUESTED_GB," - + " MEM_REQUESTED_GB_MILLIS," - + " MEM_VIRTUAL_GB," - + " MEM_USED_GB_MIN," - + " MEM_USED_GB_MAX," - + " MEM_USED_GB_AVG," - + " MEM_USED_GB_50_PCT," - + " MEM_USED_GB_75_PCT," - + " MEM_USED_GB_90_PCT," - + " MEM_USED_GB_95_PCT," - + " MEM_USED_GB_99_PCT," - + " MEM_UNUSED_GB," - + " MEM_UNUSED_GB_MILLIS) VALUES " + - "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; - - public static final String UPSERT_CLUSTER_AGGREGATE_SQL = "UPSERT INTO " + - "%s (UUID, " + - "SERVER_TIME, " + - "METRIC_SUM, " + - "HOSTS_COUNT, " + - "METRIC_MAX, " + - "METRIC_MIN) " + - "VALUES (?, ?, ?, ?, ?, ?)"; - - public static final String UPSERT_CLUSTER_AGGREGATE_TIME_SQL = "UPSERT INTO" + - " %s (UUID, SERVER_TIME, " + - "METRIC_SUM, " + - "METRIC_COUNT, " + - "METRIC_MAX, " + - "METRIC_MIN) " + - "VALUES (?, ?, ?, ?, ?, ?)"; - - public static final String UPSERT_AGGREGATE_RECORD_SQL = "UPSERT INTO " + - "%s (UUID, " + - "SERVER_TIME, " + - "METRIC_SUM, " + - "METRIC_MAX, " + - "METRIC_MIN," + - "METRIC_COUNT) " + - "VALUES (?, ?, ?, ?, ?, ?)"; - - public static final String UPSERT_METADATA_SQL = - "UPSERT INTO METRICS_METADATA (METRIC_NAME, APP_ID, INSTANCE_ID, UUID, UNITS, TYPE, " + - "START_TIME, SUPPORTS_AGGREGATION, IS_WHITELISTED) " + - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; - - public static final String UPSERT_HOSTED_APPS_METADATA_SQL = - "UPSERT INTO HOSTED_APPS_METADATA (HOSTNAME, UUID, APP_IDS) VALUES (?, ?, ?)"; - - public static final String UPSERT_INSTANCE_HOST_METADATA_SQL = - "UPSERT INTO INSTANCE_HOST_METADATA (INSTANCE_ID, HOSTNAME) VALUES (?, ?)"; - - /** - * Retrieve a set of rows from metrics records table. - */ - public static final String GET_METRIC_SQL = "SELECT UUID, SERVER_TIME, " + - "METRIC_SUM, " + - "METRIC_MAX, " + - "METRIC_MIN, " + - "METRIC_COUNT, " + - "METRICS " + - "FROM %s"; - - /** - * Get latest metrics for a number of hosts - * - * Different queries for a number and a single hosts are used due to bug - * in Apache Phoenix - */ - public static final String GET_LATEST_METRIC_SQL = "SELECT %s E.UUID AS UUID, " + - "E.SERVER_TIME AS SERVER_TIME, " + - "E.METRIC_SUM AS METRIC_SUM, " + - "E.METRIC_MAX AS METRIC_MAX, E.METRIC_MIN AS METRIC_MIN, " + - "E.METRIC_COUNT AS METRIC_COUNT, E.METRICS AS METRICS " + - "FROM %s AS E " + - "INNER JOIN " + - "(SELECT UUID, MAX(SERVER_TIME) AS MAX_SERVER_TIME " + - "FROM %s " + - "WHERE " + - "%s " + - "GROUP BY UUID) " + - "AS I " + - "ON E.UUID=I.UUID " + - "AND E.SERVER_TIME=I.MAX_SERVER_TIME"; - - public static final String GET_METRIC_AGGREGATE_ONLY_SQL = "SELECT UUID, " + - "SERVER_TIME, " + - "METRIC_SUM, " + - "METRIC_MAX, " + - "METRIC_MIN, " + - "METRIC_COUNT " + - "FROM %s"; - - public static final String GET_CLUSTER_AGGREGATE_SQL = "SELECT " + - "UUID, " + - "SERVER_TIME, " + - "METRIC_SUM, " + - "HOSTS_COUNT, " + - "METRIC_MAX, " + - "METRIC_MIN " + - "FROM %s"; - - public static final String GET_CLUSTER_AGGREGATE_TIME_SQL = "SELECT " + - "UUID, " + - "SERVER_TIME, " + - "METRIC_SUM, " + - "METRIC_COUNT, " + - "METRIC_MAX, " + - "METRIC_MIN " + - "FROM %s"; - - public static final String TOP_N_INNER_SQL = "SELECT UUID " + - "FROM %s WHERE %s GROUP BY UUID ORDER BY %s LIMIT %s"; - - public static final String GET_METRIC_METADATA_SQL = "SELECT " + - "METRIC_NAME, APP_ID, INSTANCE_ID, UUID, UNITS, TYPE, START_TIME, " + - "SUPPORTS_AGGREGATION, IS_WHITELISTED FROM METRICS_METADATA"; - - public static final String GET_HOSTED_APPS_METADATA_SQL = "SELECT " + - "HOSTNAME, UUID, APP_IDS FROM HOSTED_APPS_METADATA"; - - public static final String GET_INSTANCE_HOST_METADATA_SQL = "SELECT " + - "INSTANCE_ID, HOSTNAME FROM INSTANCE_HOST_METADATA"; - - /** - * Aggregate host metrics using a GROUP BY clause to take advantage of - * N - way parallel scan where N = number of regions. - */ - public static final String GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL = "UPSERT " + - "INTO %s (UUID, SERVER_TIME, METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) " + - "SELECT UUID, %s AS SERVER_TIME, " + - "SUM(METRIC_SUM), SUM(METRIC_COUNT), MAX(METRIC_MAX), MIN(METRIC_MIN) " + - "FROM %s WHERE%s SERVER_TIME > %s AND SERVER_TIME <= %s " + - "GROUP BY UUID"; - - /** - * Downsample host metrics. - */ - public static final String DOWNSAMPLE_HOST_METRIC_SQL_UPSERT_PREFIX = "UPSERT INTO %s (UUID, SERVER_TIME, " + - "METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) "; - - public static final String TOPN_DOWNSAMPLER_HOST_METRIC_SELECT_SQL = "SELECT UUID, " + - "%s AS SERVER_TIME, %s, 1, %s, %s FROM %s WHERE UUID IN %s AND SERVER_TIME > %s AND SERVER_TIME <= %s " + - "GROUP BY UUID ORDER BY %s DESC LIMIT %s"; - - /** - * Aggregate app metrics using a GROUP BY clause to take advantage of - * N - way parallel scan where N = number of regions. - */ - public static final String GET_AGGREGATED_APP_METRIC_GROUPBY_SQL = "UPSERT " + - "INTO %s (UUID, SERVER_TIME, METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) SELECT UUID, %s AS SERVER_TIME, " + - "ROUND(AVG(METRIC_SUM),2), ROUND(AVG(%s)), MAX(METRIC_MAX), MIN(METRIC_MIN) FROM %s WHERE%s SERVER_TIME > %s AND " + - "SERVER_TIME <= %s GROUP BY UUID"; - - /** - * Downsample cluster metrics. - */ - public static final String DOWNSAMPLE_CLUSTER_METRIC_SQL_UPSERT_PREFIX = "UPSERT INTO %s (UUID, SERVER_TIME, " + - "METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) "; - - public static final String TOPN_DOWNSAMPLER_CLUSTER_METRIC_SELECT_SQL = "SELECT UUID, " + - "%s AS SERVER_TIME, %s, 1, %s, %s FROM %s WHERE UUID IN %s AND SERVER_TIME > %s AND SERVER_TIME <= %s " + - "GROUP BY UUID ORDER BY %s DESC LIMIT %s"; - - public static final String METRICS_RECORD_TABLE_NAME = "METRIC_RECORD"; - - public static final String CONTAINER_METRICS_TABLE_NAME = "CONTAINER_METRICS"; - - public static final String METRICS_AGGREGATE_MINUTE_TABLE_NAME = - "METRIC_RECORD_MINUTE"; - public static final String METRICS_AGGREGATE_HOURLY_TABLE_NAME = - "METRIC_RECORD_HOURLY"; - public static final String METRICS_AGGREGATE_DAILY_TABLE_NAME = - "METRIC_RECORD_DAILY"; - public static final String METRICS_CLUSTER_AGGREGATE_TABLE_NAME = - "METRIC_AGGREGATE"; - public static final String METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME = - "METRIC_AGGREGATE_MINUTE"; - public static final String METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME = - "METRIC_AGGREGATE_HOURLY"; - public static final String METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME = - "METRIC_AGGREGATE_DAILY"; - - public static final Pattern PHOENIX_TABLES_REGEX_PATTERN = Pattern.compile("METRIC_"); - - public static final String[] PHOENIX_TABLES = { - METRICS_RECORD_TABLE_NAME, - METRICS_AGGREGATE_MINUTE_TABLE_NAME, - METRICS_AGGREGATE_HOURLY_TABLE_NAME, - METRICS_AGGREGATE_DAILY_TABLE_NAME, - METRICS_CLUSTER_AGGREGATE_TABLE_NAME, - METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME, - METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME, - METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME - }; - - public static final String DEFAULT_TABLE_COMPRESSION = "SNAPPY"; - public static final String DEFAULT_ENCODING = "FAST_DIFF"; - public static final long HOUR = 3600000; // 1 hour - public static final long DAY = 86400000; // 1 day - private static boolean sortMergeJoinEnabled = false; - - /** - * Filter to optimize HBase scan by using file timestamps. This prevents - * a full table scan of metric records. - * - * @return Phoenix Hint String - */ - public static String getNaiveTimeRangeHint(Long startTime, Long delta) { - return String.format("/*+ NATIVE_TIME_RANGE(%s) */", (startTime - delta)); - } - - /** - * Falling back to sort merge join algorithm if default queries fail. - * - * @return Phoenix Hint String - */ - public static String getLatestMetricsHints() { - if (sortMergeJoinEnabled) { - return "/*+ USE_SORT_MERGE_JOIN NO_CACHE */"; - } - return ""; - } - - public static void setSortMergeJoinEnabled(boolean sortMergeJoinEnabled) { - PhoenixTransactSQL.sortMergeJoinEnabled = sortMergeJoinEnabled; - } - - public static PreparedStatement prepareGetMetricsSqlStmt(Connection connection, - Condition condition) throws SQLException { - - validateConditionIsNotEmpty(condition); - validateRowCountLimit(condition); - - String stmtStr; - if (condition.getStatement() != null) { - stmtStr = condition.getStatement(); - } else { - String metricsTable; - String query; - if (condition.getPrecision() == null) { - long endTime = condition.getEndTime() == null ? System.currentTimeMillis() : condition.getEndTime(); - long startTime = condition.getStartTime() == null ? 0 : condition.getStartTime(); - Precision precision = Precision.getPrecision(startTime, endTime); - condition.setPrecision(precision); - } - switch (condition.getPrecision()) { - case DAYS: - metricsTable = METRICS_AGGREGATE_DAILY_TABLE_NAME; - query = GET_METRIC_AGGREGATE_ONLY_SQL; - break; - case HOURS: - metricsTable = METRICS_AGGREGATE_HOURLY_TABLE_NAME; - query = GET_METRIC_AGGREGATE_ONLY_SQL; - break; - case MINUTES: - metricsTable = METRICS_AGGREGATE_MINUTE_TABLE_NAME; - query = GET_METRIC_AGGREGATE_ONLY_SQL; - break; - default: - metricsTable = METRICS_RECORD_TABLE_NAME; - query = GET_METRIC_SQL; - } - - stmtStr = String.format(query, metricsTable); - } - - StringBuilder sb = new StringBuilder(stmtStr); - - if (!(condition instanceof EmptyCondition)) { - sb.append(" WHERE "); - sb.append(condition.getConditionClause()); - String orderByClause = condition.getOrderByClause(true); - if (orderByClause != null) { - sb.append(orderByClause); - } else { - sb.append(" ORDER BY UUID, SERVER_TIME "); - } - } - - if (condition.getLimit() != null) { - sb.append(" LIMIT ").append(condition.getLimit()); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("SQL: " + sb.toString() + ", condition: " + condition); - } - - PreparedStatement stmt = null; - try { - stmt = connection.prepareStatement(sb.toString()); - int pos = 1; - pos = addUuids(condition, pos, stmt); - - if (condition instanceof TopNCondition) { - pos = addStartTime(condition, pos, stmt); - pos = addEndTime(condition, pos, stmt); - } - - pos = addStartTime(condition, pos, stmt); - addEndTime(condition, pos, stmt); - - if (condition.getFetchSize() != null) { - stmt.setFetchSize(condition.getFetchSize()); - } - } catch (SQLException e) { - if (stmt != null) { - stmt.close(); - } - throw e; - } - - if (condition instanceof TopNCondition) { - LOG.info(sb.toString()); - } - return stmt; - } - - private static void validateConditionIsNotEmpty(Condition condition) { - if (condition.isEmpty()) { - throw new IllegalArgumentException("Condition is empty."); - } - } - - private static void validateRowCountLimit(Condition condition) { - if (condition.getMetricNames() == null - || condition.getMetricNames().isEmpty()) { - //aggregator can use empty metrics query - return; - } - - long range = condition.getEndTime() - condition.getStartTime(); - long rowsPerMetric; - - //Get Precision (passed in or computed) and estimate values returned based on that. - Precision precision = condition.getPrecision(); - if (precision == null) { - precision = Precision.getPrecision(condition.getStartTime(), condition.getEndTime()); - } - - switch (precision) { - case DAYS: - rowsPerMetric = TimeUnit.MILLISECONDS.toDays(range); - break; - case HOURS: - rowsPerMetric = TimeUnit.MILLISECONDS.toHours(range); - break; - case MINUTES: - rowsPerMetric = TimeUnit.MILLISECONDS.toMinutes(range)/5; //5 minute data in METRIC_AGGREGATE_MINUTE table. - break; - default: - rowsPerMetric = TimeUnit.MILLISECONDS.toSeconds(range)/10; //10 second data in METRIC_AGGREGATE table - } - - List<String> hostNames = condition.getHostnames(); - int numHosts = (hostNames == null || hostNames.isEmpty()) ? 1 : condition.getHostnames().size(); - - long totalRowsRequested = rowsPerMetric * condition.getMetricNames().size() * numHosts; - - if (totalRowsRequested > PhoenixHBaseAccessor.RESULTSET_LIMIT) { - throw new PrecisionLimitExceededException("Requested " + condition.getMetricNames().size() + " metrics for " - + numHosts + " hosts in " + precision + " precision for the time range of " + range/1000 - + " seconds. Estimated resultset size of " + totalRowsRequested + " is greater than the limit of " - + PhoenixHBaseAccessor.RESULTSET_LIMIT + ". Request lower precision or fewer number of metrics or hosts." + - " Alternatively, increase the limit value through ams-site:timeline.metrics.service.default.result.limit config"); - } - } - - public static PreparedStatement prepareGetLatestMetricSqlStmt( - Connection connection, Condition condition) throws SQLException { - - validateConditionIsNotEmpty(condition); - - if (condition.getMetricNames() == null - || condition.getMetricNames().isEmpty()) { - throw new IllegalArgumentException("Point in time query without " - + "metric names not supported "); - } - - String stmtStr; - if (condition.getStatement() != null) { - stmtStr = condition.getStatement(); - } else { - stmtStr = String.format(GET_LATEST_METRIC_SQL, - getLatestMetricsHints(), - METRICS_RECORD_TABLE_NAME, - METRICS_RECORD_TABLE_NAME, - condition.getConditionClause()); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("SQL: " + stmtStr + ", condition: " + condition); - } - PreparedStatement stmt = null; - try { - stmt = connection.prepareStatement(stmtStr); - setQueryParameters(stmt, condition); - } catch (SQLException e) { - if (stmt != null) { - stmt.close(); - } - throw e; - } - - return stmt; - } - - private static PreparedStatement setQueryParameters(PreparedStatement stmt, - Condition condition) throws SQLException { - int pos = 1; - //For GET_LATEST_METRIC_SQL_SINGLE_HOST parameters should be set 2 times - do { - if (condition.getUuids() != null) { - for (byte[] uuid : condition.getUuids()) { - stmt.setBytes(pos++, uuid); - } - } - if (condition.getFetchSize() != null) { - stmt.setFetchSize(condition.getFetchSize()); - pos++; - } - } while (pos < stmt.getParameterMetaData().getParameterCount()); - - return stmt; - } - - public static PreparedStatement prepareGetAggregateSqlStmt( - Connection connection, Condition condition) throws SQLException { - - validateConditionIsNotEmpty(condition); - validateRowCountLimit(condition); - - String metricsAggregateTable; - String queryStmt; - if (condition.getPrecision() == null) { - long endTime = condition.getEndTime() == null ? System.currentTimeMillis() : condition.getEndTime(); - long startTime = condition.getStartTime() == null ? 0 : condition.getStartTime(); - condition.setPrecision(Precision.getPrecision(startTime, endTime)); - } - switch (condition.getPrecision()) { - case DAYS: - metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME; - queryStmt = GET_CLUSTER_AGGREGATE_TIME_SQL; - break; - case HOURS: - metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; - queryStmt = GET_CLUSTER_AGGREGATE_TIME_SQL; - break; - case MINUTES: - metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME; - queryStmt = GET_CLUSTER_AGGREGATE_TIME_SQL; - break; - default: - metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_TABLE_NAME; - queryStmt = GET_CLUSTER_AGGREGATE_SQL; - } - - queryStmt = String.format(queryStmt, metricsAggregateTable); - - StringBuilder sb = new StringBuilder(queryStmt); - sb.append(" WHERE "); - sb.append(condition.getConditionClause()); - sb.append(" ORDER BY UUID, SERVER_TIME"); - if (condition.getLimit() != null) { - sb.append(" LIMIT ").append(condition.getLimit()); - } - - String query = sb.toString(); - - if (LOG.isDebugEnabled()) { - LOG.debug("SQL => " + query + ", condition => " + condition); - } - PreparedStatement stmt = null; - try { - stmt = connection.prepareStatement(query); - int pos = 1; - - pos = addUuids(condition, pos, stmt); - - if (condition instanceof TopNCondition) { - pos = addStartTime(condition, pos, stmt); - pos = addEndTime(condition, pos, stmt); - } - - // TODO: Upper case all strings on POST - pos = addStartTime(condition, pos, stmt); - addEndTime(condition, pos, stmt); - } catch (SQLException e) { - if (stmt != null) { - stmt.close(); - } - throw e; - } - - if (condition instanceof TopNCondition) { - LOG.info(sb.toString()); - } - return stmt; - } - - public static PreparedStatement prepareGetLatestAggregateMetricSqlStmt( - Connection connection, SplitByMetricNamesCondition condition) throws SQLException { - - validateConditionIsNotEmpty(condition); - - String stmtStr; - if (condition.getStatement() != null) { - stmtStr = condition.getStatement(); - } else { - stmtStr = String.format(GET_CLUSTER_AGGREGATE_SQL, - METRICS_CLUSTER_AGGREGATE_TABLE_NAME); - } - - StringBuilder sb = new StringBuilder(stmtStr); - sb.append(" WHERE "); - sb.append(condition.getConditionClause()); - String orderByClause = condition.getOrderByClause(false); - if (orderByClause != null) { - sb.append(orderByClause); - } else { - sb.append(" ORDER BY UUID DESC, SERVER_TIME DESC "); - } - - sb.append(" LIMIT ").append(condition.getMetricNames().size()); - - String query = sb.toString(); - if (LOG.isDebugEnabled()) { - LOG.debug("SQL: " + query + ", condition: " + condition); - } - - PreparedStatement stmt = null; - try { - stmt = connection.prepareStatement(query); - int pos = 1; - if (condition.getMetricNames() != null) { - for (; pos <= condition.getMetricNames().size(); pos++) { - stmt.setBytes(pos, condition.getCurrentUuid()); - } - } - } catch (SQLException e) { - if (stmt != null) { - - } - throw e; - } - - return stmt; - } - - public static String getTargetTableUsingPrecision(Precision precision, boolean withHosts) { - - String inputTable = null; - if (precision != null) { - if (withHosts) { - switch (precision) { - case DAYS: - inputTable = PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME; - break; - case HOURS: - inputTable = PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME; - break; - case MINUTES: - inputTable = PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME; - break; - default: - inputTable = PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; - } - } else { - switch (precision) { - case DAYS: - inputTable = PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME; - break; - case HOURS: - inputTable = PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; - break; - case MINUTES: - inputTable = PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME; - break; - default: - inputTable = PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME; - } - } - } else { - if (withHosts) { - inputTable = PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; - } else { - inputTable = PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME; - } - } - return inputTable; - } - - private static int addUuids(Condition condition, int pos, PreparedStatement stmt) throws SQLException { - if (condition.getUuids() != null) { - for (int pos2 = 1 ; pos2 <= condition.getUuids().size(); pos2++,pos++) { - byte[] uuid = condition.getUuids().get(pos2 - 1); - if (LOG.isDebugEnabled()) { - LOG.debug("Setting pos: " + pos + ", value = " + new String(uuid)); - } - - if (uuid.length != TimelineMetricMetadataManager.HOSTNAME_UUID_LENGTH + TimelineMetricMetadataManager.TIMELINE_METRIC_UUID_LENGTH) { - stmt.setString(pos, new String(uuid)); - } else { - stmt.setBytes(pos, uuid); - } - } - } - return pos; - } - - private static int addStartTime(Condition condition, int pos, PreparedStatement stmt) throws SQLException { - if (condition.getStartTime() != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting pos: " + pos + ", value: " + condition.getStartTime()); - } - stmt.setLong(pos++, condition.getStartTime()); - } - return pos; - } - - private static int addEndTime(Condition condition, int pos, PreparedStatement stmt) throws SQLException { - - if (condition.getEndTime() != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting pos: " + pos + ", value: " + condition.getEndTime()); - } - stmt.setLong(pos++, condition.getEndTime()); - } - return pos; - } - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/SplitByMetricNamesCondition.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/SplitByMetricNamesCondition.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/SplitByMetricNamesCondition.java deleted file mode 100644 index 6eadcea..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/SplitByMetricNamesCondition.java +++ /dev/null @@ -1,189 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query; - -import java.util.Collections; -import java.util.List; - -import org.apache.hadoop.metrics2.sink.timeline.Precision; -// TODO get rid of this class -public class SplitByMetricNamesCondition implements Condition { - private final Condition adaptee; - private byte[] currentUuid; - private boolean metricNamesNotCondition = false; - - public SplitByMetricNamesCondition(Condition condition){ - this.adaptee = condition; - } - - @Override - public boolean isEmpty() { - return adaptee.isEmpty(); - } - - @Override - public List<byte[]> getUuids() { - return adaptee.getUuids(); - } - - @Override - public List<String> getMetricNames() { - return Collections.singletonList(new String(currentUuid)); - } - - @Override - public boolean isPointInTime() { - return adaptee.isPointInTime(); - } - - @Override - public boolean isGrouped() { - return adaptee.isGrouped(); - } - - @Override - public void setStatement(String statement) { - adaptee.setStatement(statement); - } - - @Override - public List<String> getHostnames() { - return adaptee.getHostnames(); - } - - @Override - public Precision getPrecision() { - return adaptee.getPrecision(); - } - - @Override - public void setPrecision(Precision precision) { - adaptee.setPrecision(precision); - } - - @Override - public String getAppId() { - return adaptee.getAppId(); - } - - @Override - public String getInstanceId() { - return adaptee.getInstanceId(); - } - - @Override - public StringBuilder getConditionClause() { - StringBuilder sb = new StringBuilder(); - boolean appendConjunction = false; - - if (getMetricNames() != null) { - for (String name : getMetricNames()) { - if (sb.length() > 1) { - sb.append(" OR "); - } - sb.append("UUID = ?"); - } - - appendConjunction = true; - } - - appendConjunction = DefaultCondition.append(sb, appendConjunction, - getStartTime(), " SERVER_TIME >= ?"); - DefaultCondition.append(sb, appendConjunction, getEndTime(), - " SERVER_TIME < ?"); - - return sb; - } - - @Override - public String getOrderByClause(boolean asc) { - return adaptee.getOrderByClause(asc); - } - - @Override - public String getStatement() { - return adaptee.getStatement(); - } - - @Override - public Long getStartTime() { - return adaptee.getStartTime(); - } - - @Override - public Long getEndTime() { - return adaptee.getEndTime(); - } - - @Override - public Integer getLimit() { - return adaptee.getLimit(); - } - - @Override - public Integer getFetchSize() { - return adaptee.getFetchSize(); - } - - @Override - public void setFetchSize(Integer fetchSize) { - adaptee.setFetchSize(fetchSize); - } - - @Override - public void addOrderByColumn(String column) { - adaptee.addOrderByColumn(column); - } - - @Override - public void setNoLimit() { - adaptee.setNoLimit(); - } - - @Override - public boolean doUpdate() { - return false; - } - - public List<String> getOriginalMetricNames() { - return adaptee.getMetricNames(); - } - - public void setCurrentUuid(byte[] uuid) { - this.currentUuid = uuid; - } - - public byte[] getCurrentUuid() { - return currentUuid; - } - - @Override - public void setMetricNamesNotCondition(boolean metricNamesNotCondition) { - this.metricNamesNotCondition = metricNamesNotCondition; - } - - @Override - public void setHostnamesNotCondition(boolean hostNamesNotCondition) { - - } - - @Override - public void setUuidNotCondition(boolean uuidNotCondition) { - - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/TopNCondition.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/TopNCondition.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/TopNCondition.java deleted file mode 100644 index 4a5491f..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/TopNCondition.java +++ /dev/null @@ -1,162 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query; - -import java.util.List; - -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.metrics2.sink.timeline.Precision; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function; - -public class TopNCondition extends DefaultCondition{ - - private Integer topN; - private boolean isBottomN; - private Function topNFunction; - private static final Log LOG = LogFactory.getLog(TopNCondition.class); - - public TopNCondition(List<byte[]> uuids, List<String> metricNames, List<String> hostnames, String appId, - String instanceId, Long startTime, Long endTime, Precision precision, - Integer limit, boolean grouped, Integer topN, Function topNFunction, - boolean isBottomN) { - super(uuids, metricNames, hostnames, appId, instanceId, startTime, endTime, precision, limit, grouped); - this.topN = topN; - this.isBottomN = isBottomN; - this.topNFunction = topNFunction; - } - - @Override - public StringBuilder getConditionClause() { - - - if (!(isTopNHostCondition(metricNames, hostnames) || isTopNMetricCondition(metricNames, hostnames))) { - LOG.error("Unsupported TopN Operation requested. Query can have either multiple hosts or multiple metric names " + - "but not both."); - return null; - } - - StringBuilder sb = new StringBuilder(); - sb.append(" UUID IN ("); - sb.append(getTopNInnerQuery()); - sb.append(")"); - - boolean appendConjunction = true; - appendConjunction = append(sb, appendConjunction, getStartTime(), " SERVER_TIME >= ?"); - append(sb, appendConjunction, getEndTime(), " SERVER_TIME < ?"); - - return sb; - } - - public String getTopNInnerQuery() { - return String.format(PhoenixTransactSQL.TOP_N_INNER_SQL, - PhoenixTransactSQL.getTargetTableUsingPrecision(precision, CollectionUtils.isNotEmpty(hostnames)), - super.getConditionClause().toString(), getTopNOrderByClause(), topN); - } - - private String getTopNOrderByClause() { - - String orderByClause = getColumnSelect(this.topNFunction); - orderByClause += (isBottomN ? " ASC" : " DESC"); - return orderByClause; - } - - public static String getColumnSelect(Function topNFunction) { - String columnSelect = null; - if (topNFunction != null) { - switch (topNFunction.getReadFunction()) { - case AVG: - columnSelect = "ROUND(AVG(METRIC_SUM),2)"; - break; - case SUM: - columnSelect = "SUM(METRIC_SUM)"; - break; - default: - columnSelect = "MAX(METRIC_MAX)"; - break; - } - } - if (columnSelect == null) { - columnSelect = "MAX(METRIC_MAX)"; - } - return columnSelect; - } - - public boolean isTopNHostCondition() { - return isTopNHostCondition(metricNames, hostnames); - } - - public boolean isTopNMetricCondition() { - return isTopNMetricCondition(metricNames, hostnames); - } - - /** - * Check if this is a case of Top N hosts condition - * @param metricNames A list of Strings. - * @param hostnames A list of Strings. - * @return True if it is a Case of Top N Hosts (1 Metric and H hosts). - */ - public static boolean isTopNHostCondition(List<String> metricNames, List<String> hostnames) { - // Case 1 : 1 Metric, H hosts - // Select Top N or Bottom N host series based on 1 metric (max/avg/sum) - // Hostnames cannot be empty - // Only 1 metric allowed, without wildcards - return (CollectionUtils.isNotEmpty(hostnames) && metricNames.size() == 1 && !metricNamesHaveWildcard(metricNames)); - - } - - /** - * Check if this is a case of Top N metrics condition - * @param metricNames A list of Strings. - * @param hostnames A list of Strings. - * @return True if it is a Case of Top N Metrics (M Metric and 1 or 0 host). - */ - public static boolean isTopNMetricCondition(List<String> metricNames, List<String> hostnames) { - // Case 2 : M Metric names or Regex, 1 or No host - // Select Top N or Bottom N metric series based on metric values(max/avg/sum) - // MetricNames cannot be empty - // No host (aggregate) or 1 host allowed, without wildcards - return (CollectionUtils.isNotEmpty(metricNames) && (hostnames == null || hostnames.size() <= 1) && - !hostNamesHaveWildcard(hostnames)); - } - - public Integer getTopN() { - return topN; - } - - public void setTopN(Integer topN) { - this.topN = topN; - } - - public boolean isBottomN() { - return isBottomN; - } - - public void setIsBottomN(boolean isBottomN) { - this.isBottomN = isBottomN; - } - - public Function getTopNFunction() { - return topNFunction; - } - - public void setTopNFunction(Function topNFunction) { - this.topNFunction = topNFunction; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/DefaultFSSinkProvider.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/DefaultFSSinkProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/DefaultFSSinkProvider.java deleted file mode 100644 index 6ec6cf9..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/DefaultFSSinkProvider.java +++ /dev/null @@ -1,153 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink; - -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL; - -import java.io.File; -import java.io.IOException; -import java.util.Collection; -import java.util.Date; - -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider; - -public class DefaultFSSinkProvider implements ExternalSinkProvider { - private static final Log LOG = LogFactory.getLog(DefaultFSSinkProvider.class); - TimelineMetricConfiguration conf = TimelineMetricConfiguration.getInstance(); - private final DefaultExternalMetricsSink sink = new DefaultExternalMetricsSink(); - private long FIXED_FILE_SIZE; - private final String SINK_FILE_NAME = "external-metrics-sink.dat"; - private final String SEPARATOR = ", "; - private final String LINE_SEP = System.lineSeparator(); - private final String HEADERS = "METRIC, APP_ID, INSTANCE_ID, HOSTNAME, START_TIME, DATA"; - - public DefaultFSSinkProvider() { - try { - FIXED_FILE_SIZE = conf.getMetricsConf().getLong("timeline.metrics.service.external.fs.sink.filesize", FileUtils.ONE_MB * 100); - } catch (Exception ignored) { - FIXED_FILE_SIZE = FileUtils.ONE_MB * 100; - } - } - - @Override - public ExternalMetricsSink getExternalMetricsSink(InternalSourceProvider.SOURCE_NAME sourceName) { - return sink; - } - - class DefaultExternalMetricsSink implements ExternalMetricsSink { - - @Override - public int getSinkTimeOutSeconds() { - return 10; - } - - @Override - public int getFlushSeconds() { - try { - return conf.getMetricsConf().getInt(TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, 3); - } catch (Exception e) { - LOG.warn("Cannot read cache commit interval."); - } - return 3; - } - - private boolean createFile(File f) { - boolean created = false; - if (!f.exists()) { - try { - created = f.createNewFile(); - FileUtils.writeStringToFile(f, HEADERS); - } catch (IOException e) { - LOG.error("Cannot create " + SINK_FILE_NAME + " at " + f.getPath()); - return false; - } - } - - return created; - } - - private boolean shouldReCreate(File f) { - if (!f.exists()) { - return true; - } - if (FileUtils.sizeOf(f) > FIXED_FILE_SIZE) { - return true; - } - return false; - } - - @Override - public void sinkMetricData(Collection<TimelineMetrics> metrics) { - String dirPath = TimelineMetricConfiguration.getInstance().getDefaultMetricsSinkDir(); - File dir = new File(dirPath); - if (!dir.exists()) { - LOG.error("Cannot sink data to file system, incorrect dir path " + dirPath); - return; - } - - File f = FileUtils.getFile(dirPath, SINK_FILE_NAME); - if (shouldReCreate(f)) { - if (!f.delete()) { - LOG.warn("Unable to delete external sink file."); - return; - } - createFile(f); - } - - if (metrics != null) { - for (TimelineMetrics timelineMetrics : metrics) { - for (TimelineMetric metric : timelineMetrics.getMetrics()) { - StringBuilder sb = new StringBuilder(); - sb.append(metric.getMetricName()); - sb.append(SEPARATOR); - sb.append(metric.getAppId()); - sb.append(SEPARATOR); - if (StringUtils.isEmpty(metric.getInstanceId())) { - sb.append(SEPARATOR); - } else { - sb.append(metric.getInstanceId()); - sb.append(SEPARATOR); - } - if (StringUtils.isEmpty(metric.getHostName())) { - sb.append(SEPARATOR); - } else { - sb.append(metric.getHostName()); - sb.append(SEPARATOR); - } - sb.append(new Date(metric.getStartTime())); - sb.append(SEPARATOR); - sb.append(metric.getMetricValues().toString()); - sb.append(LINE_SEP); - try { - FileUtils.writeStringToFile(f, sb.toString()); - } catch (IOException e) { - LOG.warn("Unable to sink data to file " + f.getPath()); - } - } - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalMetricsSink.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalMetricsSink.java deleted file mode 100644 index ff06307..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalMetricsSink.java +++ /dev/null @@ -1,48 +0,0 @@ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink; - -import java.util.Collection; - -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; - -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -public interface ExternalMetricsSink { - /** - * How many seconds to wait on sink before dropping metrics. - * Note: Care should be taken that this timeout does not bottleneck the - * sink thread. - */ - int getSinkTimeOutSeconds(); - - /** - * How frequently to flush data to external system. - * Default would be between 60 - 120 seconds, coherent with default sink - * interval of AMS. - */ - int getFlushSeconds(); - - /** - * Raw data stream to process / store on external system. - * The data will be held in an in-memory cache and flushed at flush seconds - * or when the cache size limit is exceeded we will flush the cache and - * drop data if write fails. - * - * @param metrics {@link Collection<TimelineMetrics>} - */ - void sinkMetricData(Collection<TimelineMetrics> metrics); -} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalSinkProvider.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalSinkProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalSinkProvider.java deleted file mode 100644 index 7c7683b..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalSinkProvider.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink; - -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider.SOURCE_NAME; - - -/** - * Configurable provider for sink classes that match the metrics sources. - * Provider can return same sink or different sinks for each source. - */ -public interface ExternalSinkProvider { - - /** - * Return an instance of the metrics sink for the give source - * @return {@link ExternalMetricsSink} - */ - ExternalMetricsSink getExternalMetricsSink(SOURCE_NAME sourceName); -} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/HttpSinkProvider.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/HttpSinkProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/HttpSinkProvider.java deleted file mode 100644 index 9c2a93e..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/HttpSinkProvider.java +++ /dev/null @@ -1,231 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink; - -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.net.HttpURLConnection; -import java.net.URL; -import java.security.KeyStore; -import java.util.Collection; - -import javax.net.ssl.HttpsURLConnection; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLSocketFactory; -import javax.net.ssl.TrustManagerFactory; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider; -import org.apache.http.client.utils.URIBuilder; -import org.codehaus.jackson.map.AnnotationIntrospector; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.map.annotate.JsonSerialize; -import org.codehaus.jackson.xc.JaxbAnnotationIntrospector; - -public class HttpSinkProvider implements ExternalSinkProvider { - private static final Log LOG = LogFactory.getLog(HttpSinkProvider.class); - TimelineMetricConfiguration conf = TimelineMetricConfiguration.getInstance(); - - private String connectUrl; - private SSLSocketFactory sslSocketFactory; - protected static ObjectMapper mapper; - - static { - mapper = new ObjectMapper(); - AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(); - mapper.setAnnotationIntrospector(introspector); - mapper.getSerializationConfig() - .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL); - } - - public HttpSinkProvider() { - Configuration config; - try { - config = conf.getMetricsConf(); - } catch (Exception e) { - throw new ExceptionInInitializerError("Unable to read configuration for sink."); - } - String protocol = config.get("timeline.metrics.service.external.http.sink.protocol", "http"); - String host = config.get("timeline.metrics.service.external.http.sink.host", "localhost"); - String port = config.get("timeline.metrics.service.external.http.sink.port", "6189"); - - if (protocol.contains("https")) { - loadTruststore( - config.getTrimmed("timeline.metrics.service.external.http.sink.truststore.path"), - config.getTrimmed("timeline.metrics.service.external.http.sink.truststore.type"), - config.getTrimmed("timeline.metrics.service.external.http.sink.truststore.password") - ); - } - - URIBuilder uriBuilder = new URIBuilder(); - uriBuilder.setScheme(protocol); - uriBuilder.setHost(host); - uriBuilder.setPort(Integer.parseInt(port)); - connectUrl = uriBuilder.toString(); - } - - @Override - public ExternalMetricsSink getExternalMetricsSink(InternalSourceProvider.SOURCE_NAME sourceName) { - return new DefaultHttpMetricsSink(); - } - - protected HttpURLConnection getConnection(String spec) throws IOException { - return (HttpURLConnection) new URL(spec).openConnection(); - } - - // Get an ssl connection - protected HttpsURLConnection getSSLConnection(String spec) - throws IOException, IllegalStateException { - - HttpsURLConnection connection = (HttpsURLConnection) (new URL(spec).openConnection()); - connection.setSSLSocketFactory(sslSocketFactory); - return connection; - } - - protected void loadTruststore(String trustStorePath, String trustStoreType, - String trustStorePassword) { - if (sslSocketFactory == null) { - if (trustStorePath == null || trustStorePassword == null) { - String msg = "Can't load TrustStore. Truststore path or password is not set."; - LOG.error(msg); - throw new IllegalStateException(msg); - } - FileInputStream in = null; - try { - in = new FileInputStream(new File(trustStorePath)); - KeyStore store = KeyStore.getInstance(trustStoreType == null ? - KeyStore.getDefaultType() : trustStoreType); - store.load(in, trustStorePassword.toCharArray()); - TrustManagerFactory tmf = TrustManagerFactory - .getInstance(TrustManagerFactory.getDefaultAlgorithm()); - tmf.init(store); - SSLContext context = SSLContext.getInstance("TLS"); - context.init(null, tmf.getTrustManagers(), null); - sslSocketFactory = context.getSocketFactory(); - } catch (Exception e) { - LOG.error("Unable to load TrustStore", e); - } finally { - if (in != null) { - try { - in.close(); - } catch (IOException e) { - LOG.error("Unable to load TrustStore", e); - } - } - } - } - } - - class DefaultHttpMetricsSink implements ExternalMetricsSink { - - @Override - public int getSinkTimeOutSeconds() { - try { - return conf.getMetricsConf().getInt("timeline.metrics.external.sink.http.timeout.seconds", 10); - } catch (Exception e) { - return 10; - } - } - - @Override - public int getFlushSeconds() { - try { - return conf.getMetricsConf().getInt(TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, 3); - } catch (Exception e) { - LOG.warn("Cannot read cache commit interval."); - } - return 3; - } - - /** - * Cleans up and closes an input stream - * see http://docs.oracle.com/javase/6/docs/technotes/guides/net/http-keepalive.html - * @param is the InputStream to clean up - * @return string read from the InputStream - * @throws IOException - */ - protected String cleanupInputStream(InputStream is) throws IOException { - StringBuilder sb = new StringBuilder(); - if (is != null) { - try ( - InputStreamReader isr = new InputStreamReader(is); - BufferedReader br = new BufferedReader(isr) - ) { - // read the response body - String line; - while ((line = br.readLine()) != null) { - if (LOG.isDebugEnabled()) { - sb.append(line); - } - } - } finally { - is.close(); - } - } - return sb.toString(); - } - - @Override - public void sinkMetricData(Collection<TimelineMetrics> metrics) { - HttpURLConnection connection = null; - try { - connection = connectUrl.startsWith("https") ? getSSLConnection(connectUrl) : getConnection(connectUrl); - - connection.setRequestMethod("POST"); - connection.setRequestProperty("Content-Type", "application/json"); - connection.setRequestProperty("Connection", "Keep-Alive"); - connection.setConnectTimeout(getSinkTimeOutSeconds()); - connection.setReadTimeout(getSinkTimeOutSeconds()); - connection.setDoOutput(true); - - if (metrics != null) { - String jsonData = mapper.writeValueAsString(metrics); - try (OutputStream os = connection.getOutputStream()) { - os.write(jsonData.getBytes("UTF-8")); - } - } - - int statusCode = connection.getResponseCode(); - - if (statusCode != 200) { - LOG.info("Unable to POST metrics to external sink, " + connectUrl + - ", statusCode = " + statusCode); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Metrics posted to external sink " + connectUrl); - } - } - cleanupInputStream(connection.getInputStream()); - - } catch (IOException io) { - LOG.warn("Unable to sink data to external system.", io); - } - } - } -}
