Repository: incubator-eagle Updated Branches: refs/heads/master f899ca1c2 -> 0d1dcc408
[EAGLE-819] Fix yarn node duplication in topology health check app https://issues.apache.org/jira/browse/EAGLE-819 Author: Zhao, Qingwen <qingwz...@apache.org> Closes #712 from qingwen220/EAGLE-819. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/0d1dcc40 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/0d1dcc40 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/0d1dcc40 Branch: refs/heads/master Commit: 0d1dcc408ddf499a09e3868480494062e45cb736 Parents: f899ca1 Author: Zhao, Qingwen <qingwz...@apache.org> Authored: Mon Dec 5 18:07:50 2016 +0800 Committer: Zhao, Qingwen <qingwz...@apache.org> Committed: Mon Dec 5 18:07:50 2016 +0800 ---------------------------------------------------------------------- ...e.alert.app.AlertUnitTopologyAppProvider.xml | 2 +- .../eagle/alert/engine/topology/TestBolt.java | 4 - .../app/utils/connection/InputStreamUtils.java | 5 +- .../connection/ServiceNotResponseException.java | 61 +++++++++ .../utils/connection/URLResourceFetcher.java | 62 ++++++++++ .../src/main/bin/createTables.sql | 6 +- .../eagle/topology/TopologyCheckAppConfig.java | 2 +- .../extractor/TopologyEntityParser.java | 2 +- .../extractor/TopologyEntityParserResult.java | 4 + .../extractor/hbase/HbaseTopologyCrawler.java | 13 +- .../hbase/HbaseTopologyEntityParser.java | 29 +++-- .../extractor/hdfs/HdfsTopologyCrawler.java | 10 +- .../hdfs/HdfsTopologyEntityParser.java | 64 +++++----- .../extractor/mr/MRTopologyCrawler.java | 2 +- .../extractor/mr/MRTopologyEntityParser.java | 124 +++++++++---------- .../topology/extractor/mr/YarnNodeInfo.java | 6 +- .../impl/IPMaskTopologyRackResolver.java | 5 +- .../topology/storm/TopologyDataPersistBolt.java | 53 ++++---- .../topology/utils/EntityBuilderHelper.java | 11 +- .../eagle/topology/utils/JMXQueryHelper.java | 21 ++-- .../utils/ServiceNotResponseException.java | 61 --------- .../eagle/topology/utils/StringUtils.java | 45 ------- .../eagle/topology/TopologyConstants.java | 1 + .../entity/MRServiceTopologyAPIEntity.java | 11 ++ 24 files changed, 314 insertions(+), 290 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml index 74e97d3..8ecbe8c 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml +++ b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml @@ -90,7 +90,7 @@ </property> <property> <name>spout.stormKafkaUseSameZkQuorumWithKafkaBroker</name> - <displayName>Spout Transaction Zookeeper to Reuse Broker Zookeeper</displayName> + <displayName>Reuse Broker Zookeeper</displayName> <value>true</value> <description>Use same zookeeper for kafka server and kafka consumer(Storm-Kafka)</description> <required>false</required> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java index 1c375fa..dc9c9b3 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java @@ -19,7 +19,6 @@ package org.apache.eagle.alert.engine.topology; - import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; @@ -31,9 +30,6 @@ import org.slf4j.LoggerFactory; import java.util.Map; -/** - * Created by yonzhang on 4/7/16. - */ @Ignore @SuppressWarnings( {"rawtypes", "serial"}) public class TestBolt extends BaseRichBolt { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-core/eagle-app/eagle-app-utils/src/main/java/org/apache/eagle/app/utils/connection/InputStreamUtils.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-utils/src/main/java/org/apache/eagle/app/utils/connection/InputStreamUtils.java b/eagle-core/eagle-app/eagle-app-utils/src/main/java/org/apache/eagle/app/utils/connection/InputStreamUtils.java index 7b9479f..cf76627 100644 --- a/eagle-core/eagle-app/eagle-app-utils/src/main/java/org/apache/eagle/app/utils/connection/InputStreamUtils.java +++ b/eagle-core/eagle-app/eagle-app-utils/src/main/java/org/apache/eagle/app/utils/connection/InputStreamUtils.java @@ -27,8 +27,8 @@ import java.util.zip.GZIPInputStream; public class InputStreamUtils { - private static final int CONNECTION_TIMEOUT = 10 * 1000; - private static final int READ_TIMEOUT = 5 * 60 * 1000; + private static final int CONNECTION_TIMEOUT = 1 * 30 * 1000; + private static final int READ_TIMEOUT = 1 * 60 * 1000; private static final String GZIP_HTTP_HEADER = "Accept-Encoding"; private static final String GZIP_COMPRESSION = "gzip"; @@ -49,7 +49,6 @@ public class InputStreamUtils { if (null != auth) { connection.setRequestProperty("Authorization", auth); } - return connection.getInputStream(); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-core/eagle-app/eagle-app-utils/src/main/java/org/apache/eagle/app/utils/connection/ServiceNotResponseException.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-utils/src/main/java/org/apache/eagle/app/utils/connection/ServiceNotResponseException.java b/eagle-core/eagle-app/eagle-app-utils/src/main/java/org/apache/eagle/app/utils/connection/ServiceNotResponseException.java new file mode 100644 index 0000000..972b3e1 --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-utils/src/main/java/org/apache/eagle/app/utils/connection/ServiceNotResponseException.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.app.utils.connection; + +import java.io.IOException; + +public class ServiceNotResponseException extends IOException { + + private static final long serialVersionUID = -2425311876734366496L; + + /** + * Default constructor of FeederException. + */ + public ServiceNotResponseException() { + super(); + } + + /** + * Constructor of FeederException. + * + * @param message error message + */ + public ServiceNotResponseException(String message) { + super(message); + } + + /** + * Constructor of FeederException. + * + * @param message error message + * @param cause the cause of the exception + */ + public ServiceNotResponseException(String message, Throwable cause) { + super(message, cause); + } + + /** + * Constructor of FeederException. + * + * @param cause the cause of the exception + */ + public ServiceNotResponseException(Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-core/eagle-app/eagle-app-utils/src/main/java/org/apache/eagle/app/utils/connection/URLResourceFetcher.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-utils/src/main/java/org/apache/eagle/app/utils/connection/URLResourceFetcher.java b/eagle-core/eagle-app/eagle-app-utils/src/main/java/org/apache/eagle/app/utils/connection/URLResourceFetcher.java new file mode 100644 index 0000000..a93fded --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-utils/src/main/java/org/apache/eagle/app/utils/connection/URLResourceFetcher.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.utils.connection; + +import org.apache.eagle.app.utils.AppConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; + +public class URLResourceFetcher { + + private static int MAX_RETRY_COUNT = 2; + private static final Logger LOG = LoggerFactory.getLogger(URLResourceFetcher.class); + + public static InputStream openURLStream(String url) throws ServiceNotResponseException { + return openURLStream(url, AppConstants.CompressionType.NONE); + } + + public static InputStream openURLStream(String url, AppConstants.CompressionType compressionType) throws ServiceNotResponseException { + InputStream is = null; + LOG.info("Going to query URL {}", url); + for (int i = 0; i < MAX_RETRY_COUNT; i++) { + try { + is = InputStreamUtils.getInputStream(url, null, compressionType); + } catch (Exception e) { + LOG.warn("fail to fetch data from {} due to {}, and try again", url, e.getMessage()); + } + } + if (is == null) { + throw new ServiceNotResponseException(String.format("fail to fetch data from %s", url)); + } else { + return is; + } + } + + public static void closeInputStream(InputStream is) { + if (is != null) { + try { + is.close(); + } catch (IOException e) { + LOG.error(e.getMessage(), e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-server-assembly/src/main/bin/createTables.sql ---------------------------------------------------------------------- diff --git a/eagle-server-assembly/src/main/bin/createTables.sql b/eagle-server-assembly/src/main/bin/createTables.sql index 549d267..c5472b6 100644 --- a/eagle-server-assembly/src/main/bin/createTables.sql +++ b/eagle-server-assembly/src/main/bin/createTables.sql @@ -17,7 +17,7 @@ -- */ ---- application framework metadata --- +-- application framework metadata CREATE TABLE IF NOT EXISTS applications ( uuid varchar(50) PRIMARY KEY, @@ -43,7 +43,7 @@ CREATE TABLE IF NOT EXISTS sites ( UNIQUE (siteid) ); ---- eagle security module metadata --- +-- eagle security module metadata CREATE TABLE IF NOT EXISTS hdfs_sensitivity_entity ( site varchar(20) DEFAULT NULL, @@ -65,7 +65,7 @@ CREATE TABLE IF NOT EXISTS hbase_sensitivity_entity ( primary key (site, hbase_resource) ); ---- alert engine metadata --- +--- alert engine metadata CREATE TABLE IF NOT EXISTS stream_cluster ( id VARCHAR (50) PRIMARY KEY, http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java index a1c65a9..da6cd46 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java @@ -85,7 +85,7 @@ public class TopologyCheckAppConfig implements Serializable { //e.printStackTrace(); } - if (config.getBoolean("dataSourceConfig.hbase.enabled")) { + if (config.hasPath("dataSourceConfig.hbase") && config.getBoolean("dataSourceConfig.hbase.enabled")) { topologyTypes.add(TopologyConstants.TopologyType.HBASE); hBaseConfig = new HBaseConfig(); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParser.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParser.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParser.java index 64c9bd4..4677d0d 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParser.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParser.java @@ -27,7 +27,7 @@ public interface TopologyEntityParser { * Parse hadoop topology and return the topology entity results. * @return the topology entity result */ - public TopologyEntityParserResult parse(long timestamp) throws IOException; + public TopologyEntityParserResult parse(long timestamp); /** * Get topology type for the parser. http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParserResult.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParserResult.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParserResult.java index 1799054..b9f9481 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParserResult.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParserResult.java @@ -30,6 +30,10 @@ public class TopologyEntityParserResult { private List<TopologyBaseAPIEntity> slaveNodes = new ArrayList<>(); private List<GenericMetricEntity> metrics = new ArrayList<>(); + public TopologyEntityParserResult() { + version = TopologyConstants.HadoopVersion.V2; + } + public List<TopologyBaseAPIEntity> getSlaveNodes() { return slaveNodes; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hbase/HbaseTopologyCrawler.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hbase/HbaseTopologyCrawler.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hbase/HbaseTopologyCrawler.java index 398178f..04a4aa1 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hbase/HbaseTopologyCrawler.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hbase/HbaseTopologyCrawler.java @@ -49,19 +49,12 @@ public class HbaseTopologyCrawler implements TopologyCrawler { @Override public void extract() { long updateTimestamp = System.currentTimeMillis(); - TopologyEntityParserResult result = null; - try { - result = parser.parse(updateTimestamp); - } catch (Exception e) { - e.printStackTrace(); - } - if (result == null) { + TopologyEntityParserResult result = parser.parse(updateTimestamp);; + + if (result == null || result.getMetrics().isEmpty()) { LOG.warn("No data fetched"); result = new TopologyEntityParserResult(); } - if (result.getMasterNodes().isEmpty()) { - result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.HMASTER_ROLE, 0, site, updateTimestamp)); - } TopologyCheckMessageId messageId = new TopologyCheckMessageId(TopologyConstants.TopologyType.HBASE, updateTimestamp); this.collector.emit(new Values(TopologyConstants.HBASE_INSTANCE_SERVICE_NAME, result), messageId); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hbase/HbaseTopologyEntityParser.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hbase/HbaseTopologyEntityParser.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hbase/HbaseTopologyEntityParser.java index 94b3727..ce977a9 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hbase/HbaseTopologyEntityParser.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hbase/HbaseTopologyEntityParser.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.slf4j.Logger; import java.io.IOException; import java.util.HashMap; @@ -41,6 +42,7 @@ import static org.apache.eagle.topology.TopologyConstants.*; public class HbaseTopologyEntityParser implements TopologyEntityParser { + private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(HbaseTopologyEntityParser.class); private Configuration hBaseConfiguration; private String site; private Boolean kerberosEnable = false; @@ -72,18 +74,31 @@ public class HbaseTopologyEntityParser implements TopologyEntityParser { return new HBaseAdmin(this.hBaseConfiguration); } + @Override - public TopologyEntityParserResult parse(long timestamp) throws IOException { + public TopologyEntityParserResult parse(long timestamp) { + final TopologyEntityParserResult result = new TopologyEntityParserResult(); + int activeRatio = 0; + try { + doParse(timestamp, result); + activeRatio++; + } catch (Exception ex) { + LOG.error(ex.getMessage(), ex); + } + result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.HMASTER_ROLE, activeRatio, site, timestamp)); + return result; + } + + private void doParse(long timestamp, TopologyEntityParserResult result) throws IOException { long deadServers = 0; long liveServers = 0; - TopologyEntityParserResult result = new TopologyEntityParserResult(); HBaseAdmin admin = null; try { admin = getHBaseAdmin(); ClusterStatus status = admin.getClusterStatus(); deadServers = status.getDeadServers(); liveServers = status.getServersSize(); - result.setVersion(HadoopVersion.V2); + for (ServerName liveServer : status.getServers()) { ServerLoad load = status.getLoad(liveServer); result.getSlaveNodes().add(parseServer(liveServer, load, TopologyConstants.REGIONSERVER_ROLE, TopologyConstants.REGIONSERVER_LIVE_STATUS, timestamp)); @@ -103,20 +118,16 @@ public class HbaseTopologyEntityParser implements TopologyEntityParser { } double liveRatio = liveServers * 1d / (liveServers + deadServers); result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.REGIONSERVER_ROLE, liveRatio, site, timestamp)); - result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.HMASTER_ROLE, 1d, site, timestamp)); - return result; - } catch (RuntimeException e) { - e.printStackTrace(); + LOG.info("live servers: {}, dead servers: {}", liveServers, deadServers); } finally { if (admin != null) { try { admin.close(); } catch (IOException e) { - e.printStackTrace(); + LOG.error(e.getMessage(), e); } } } - return result; } private HBaseServiceTopologyAPIEntity parseServer(ServerName serverName, ServerLoad serverLoad, String role, String status, long timestamp) { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyCrawler.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyCrawler.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyCrawler.java index 7030221..f20204d 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyCrawler.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyCrawler.java @@ -46,14 +46,8 @@ public class HdfsTopologyCrawler implements TopologyCrawler { @Override public void extract() { long updateTimestamp = System.currentTimeMillis(); - TopologyEntityParserResult result = null; - try { - result = parser.parse(updateTimestamp); - } catch (IOException e) { - e.printStackTrace(); - return; - } - if (result == null || result.getMasterNodes().isEmpty()) { + TopologyEntityParserResult result = parser.parse(updateTimestamp); + if (result == null || result.getMetrics().isEmpty()) { LOG.warn("No data fetched"); return; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java index 79277a4..df6605d 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java @@ -19,6 +19,7 @@ package org.apache.eagle.topology.extractor.hdfs; import org.apache.eagle.app.utils.PathResolverHelper; +import org.apache.eagle.app.utils.connection.ServiceNotResponseException; import org.apache.eagle.topology.TopologyCheckAppConfig; import org.apache.eagle.topology.TopologyConstants; import org.apache.eagle.topology.extractor.TopologyEntityParserResult; @@ -26,6 +27,7 @@ import org.apache.eagle.topology.entity.HdfsServiceTopologyAPIEntity; import org.apache.eagle.topology.extractor.TopologyEntityParser; import org.apache.eagle.topology.resolver.TopologyRackResolver; import org.apache.eagle.topology.utils.*; +import org.apache.commons.io.FileUtils; import org.json.JSONArray; import org.json.JSONException; @@ -33,8 +35,6 @@ import org.json.JSONObject; import org.json.JSONTokener; import org.slf4j.Logger; -import java.io.IOException; -import java.net.UnknownHostException; import java.util.HashMap; import java.util.Map; import java.util.regex.Matcher; @@ -80,8 +80,6 @@ public class HdfsTopologyEntityParser implements TopologyEntityParser { private static final String STATUS_PATTERN = "([\\d\\.]+):\\d+\\s+\\([\\D]+(\\d+)\\)"; private static final String QJM_PATTERN = "([\\d\\.]+):\\d+"; - private static final double TB = 1024 * 1024 * 1024 * 1024; - public HdfsTopologyEntityParser(String site, TopologyCheckAppConfig.HdfsConfig hdfsConfig, TopologyRackResolver rackResolver) { this.namenodeUrls = hdfsConfig.namenodeUrls; this.site = site; @@ -89,31 +87,31 @@ public class HdfsTopologyEntityParser implements TopologyEntityParser { } @Override - public TopologyEntityParserResult parse(long timestamp) throws IOException { + public TopologyEntityParserResult parse(long timestamp) { final TopologyEntityParserResult result = new TopologyEntityParserResult(); - result.setVersion(TopologyConstants.HadoopVersion.V2); - int numNamenode = 0; + int inActiveHosts = 0; for (String url : namenodeUrls) { try { final HdfsServiceTopologyAPIEntity namenodeEntity = createNamenodeEntity(url, timestamp); - result.getMasterNodes().add(namenodeEntity); - numNamenode++; if (namenodeEntity.getStatus().equalsIgnoreCase(NAME_NODE_ACTIVE_STATUS)) { String namenodeVersion = createSlaveNodeEntities(url, timestamp, result); namenodeEntity.setVersion(namenodeVersion); } - } catch (RuntimeException ex) { - ex.printStackTrace(); - } catch (IOException e) { - LOG.warn("Catch an IOException with url: {}", url); + result.getMasterNodes().add(namenodeEntity); + } catch (ServiceNotResponseException e) { + inActiveHosts++; + LOG.error(e.getMessage(), e); + } catch (Exception ex) { + LOG.error("fail to parse url {} due to {}, and will cancel this parsing", url, ex.getMessage(), ex); + result.getSlaveNodes().clear(); } } - double value = numNamenode * 1d / namenodeUrls.length; + double value = (namenodeUrls.length - inActiveHosts) * 1d / namenodeUrls.length; result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.NAME_NODE_ROLE, value, site, timestamp)); return result; } - private HdfsServiceTopologyAPIEntity createNamenodeEntity(String url, long updateTime) throws JSONException, IOException { + private HdfsServiceTopologyAPIEntity createNamenodeEntity(String url, long updateTime) throws ServiceNotResponseException { final String urlString = buildFSNamesystemURL(url); final Map<String, JMXBean> jmxBeanMap = JMXQueryHelper.query(urlString); final JMXBean bean = jmxBeanMap.get(JMX_FS_NAME_SYSTEM_BEAN_NAME); @@ -121,21 +119,25 @@ public class HdfsTopologyEntityParser implements TopologyEntityParser { if (bean == null || bean.getPropertyMap() == null) { throw new ServiceNotResponseException("Invalid JMX format, FSNamesystem bean is null!"); } - final String hostname = (String) bean.getPropertyMap().get(HA_NAME); - HdfsServiceTopologyAPIEntity result = createHdfsServiceEntity(TopologyConstants.NAME_NODE_ROLE, hostname, updateTime); - final String state = (String) bean.getPropertyMap().get(HA_STATE); - result.setStatus(state); - final Double configuredCapacityGB = (Double) bean.getPropertyMap().get(CAPACITY_TOTAL_GB); - result.setConfiguredCapacityTB(Double.toString(configuredCapacityGB / 1024)); - final Double capacityUsedGB = (Double) bean.getPropertyMap().get(CAPACITY_USED_GB); - result.setUsedCapacityTB(Double.toString(capacityUsedGB / 1024)); - final Integer blocksTotal = (Integer) bean.getPropertyMap().get(BLOCKS_TOTAL); - result.setNumBlocks(Integer.toString(blocksTotal)); - return result; + HdfsServiceTopologyAPIEntity namenode = createHdfsServiceEntity(TopologyConstants.NAME_NODE_ROLE, hostname, updateTime); + try { + final String state = (String) bean.getPropertyMap().get(HA_STATE); + namenode.setStatus(state); + final Double configuredCapacityGB = (Double) bean.getPropertyMap().get(CAPACITY_TOTAL_GB); + namenode.setConfiguredCapacityTB(Double.toString(configuredCapacityGB / FileUtils.ONE_KB)); + final Double capacityUsedGB = (Double) bean.getPropertyMap().get(CAPACITY_USED_GB); + namenode.setUsedCapacityTB(Double.toString(capacityUsedGB / FileUtils.ONE_KB)); + final Integer blocksTotal = (Integer) bean.getPropertyMap().get(BLOCKS_TOTAL); + namenode.setNumBlocks(Integer.toString(blocksTotal)); + } catch (RuntimeException ex) { + LOG.error(ex.getMessage(), ex); + } + return namenode; + } - private String createSlaveNodeEntities(String url, long updateTime, TopologyEntityParserResult result) throws IOException { + private String createSlaveNodeEntities(String url, long updateTime, TopologyEntityParserResult result) throws ServiceNotResponseException { final String urlString = buildNamenodeInfo(url); final Map<String, JMXBean> jmxBeanMap = JMXQueryHelper.query(urlString); final JMXBean bean = jmxBeanMap.get(JMX_NAMENODE_INFO); @@ -147,7 +149,7 @@ public class HdfsTopologyEntityParser implements TopologyEntityParser { return (String) bean.getPropertyMap().get(NAME_NODE_VERSION); } - private void createAllJournalNodeEntities(JMXBean bean, long updateTime, TopologyEntityParserResult result) throws UnknownHostException { + private void createAllJournalNodeEntities(JMXBean bean, long updateTime, TopologyEntityParserResult result) { if (bean.getPropertyMap().get(JN_TRANSACTION_INFO) == null || bean.getPropertyMap().get(JN_STATUS) == null) { return; } @@ -194,7 +196,7 @@ public class HdfsTopologyEntityParser implements TopologyEntityParser { result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.JOURNAL_NODE_ROLE, value, site, updateTime)); } - private void createAllDataNodeEntities(JMXBean bean, long updateTime, TopologyEntityParserResult result) throws JSONException, IOException { + private void createAllDataNodeEntities(JMXBean bean, long updateTime, TopologyEntityParserResult result) throws JSONException { int numLiveNodes = 0; int numLiveDecommNodes = 0; int numDeadNodes = 0; @@ -229,9 +231,9 @@ public class HdfsTopologyEntityParser implements TopologyEntityParser { HdfsServiceTopologyAPIEntity entity = createHdfsServiceEntity(TopologyConstants.DATA_NODE_ROLE, EntityBuilderHelper.getValidHostName(hostname), updateTime); final Number configuredCapacity = (Number) liveNode.get(DATA_NODE_CAPACITY); - entity.setConfiguredCapacityTB(Double.toString(configuredCapacity.doubleValue() / TB)); + entity.setConfiguredCapacityTB(Double.toString(configuredCapacity.doubleValue() / FileUtils.ONE_TB)); final Number capacityUsed = (Number) liveNode.get(DATA_NODE_USED_SPACE); - entity.setUsedCapacityTB(Double.toString(capacityUsed.doubleValue() / TB)); + entity.setUsedCapacityTB(Double.toString(capacityUsed.doubleValue() / FileUtils.ONE_TB)); final Number blocksTotal = (Number) liveNode.get(DATA_NODE_NUM_BLOCKS); entity.setNumBlocks(Double.toString(blocksTotal.doubleValue())); if (liveNode.has(DATA_NODE_FAILED_VOLUMN)) { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyCrawler.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyCrawler.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyCrawler.java index af6bd51..e843a77 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyCrawler.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyCrawler.java @@ -45,7 +45,7 @@ public class MRTopologyCrawler implements TopologyCrawler { public void extract() { long updateTimestamp = System.currentTimeMillis(); TopologyEntityParserResult result = parser.parse(updateTimestamp); - if (result == null || result.getMasterNodes().isEmpty()) { + if (result == null || result.getMetrics().isEmpty()) { LOG.warn("No data fetched"); return; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java index 447686c..c36e9e1 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java @@ -18,9 +18,7 @@ package org.apache.eagle.topology.extractor.mr; -import org.apache.eagle.app.utils.AppConstants; import org.apache.eagle.app.utils.PathResolverHelper; -import org.apache.eagle.app.utils.connection.InputStreamUtils; import org.apache.eagle.topology.TopologyCheckAppConfig; import org.apache.eagle.topology.TopologyConstants; import org.apache.eagle.topology.extractor.TopologyEntityParserResult; @@ -28,7 +26,8 @@ import org.apache.eagle.topology.entity.MRServiceTopologyAPIEntity; import org.apache.eagle.topology.extractor.TopologyEntityParser; import org.apache.eagle.topology.resolver.TopologyRackResolver; import org.apache.eagle.topology.utils.EntityBuilderHelper; -import org.apache.eagle.topology.utils.ServiceNotResponseException; +import org.apache.eagle.app.utils.connection.ServiceNotResponseException; +import org.apache.eagle.app.utils.connection.URLResourceFetcher; import org.codehaus.jackson.JsonParser; import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; @@ -36,13 +35,13 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; -import java.net.ConnectException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.regex.Matcher; import static org.apache.eagle.topology.TopologyConstants.*; +import static org.apache.eagle.topology.utils.EntityBuilderHelper.generateKey; public class MRTopologyEntityParser implements TopologyEntityParser { @@ -81,18 +80,31 @@ public class MRTopologyEntityParser implements TopologyEntityParser { @Override public TopologyEntityParserResult parse(long timestamp) { final TopologyEntityParserResult result = new TopologyEntityParserResult(); - result.setVersion(TopologyConstants.HadoopVersion.V2); + String rmStatus; + int inActiveHosts = 0; + boolean isSuccess = false; for (String url : rmUrls) { + MRServiceTopologyAPIEntity resourceManagerEntity = createEntity(TopologyConstants.RESOURCE_MANAGER_ROLE, + extractMasterHost(url), timestamp); + rmStatus = RESOURCE_MANAGER_ACTIVE_STATUS; try { - doParse(PathResolverHelper.buildUrlPath(url, YARN_NODES_URL), timestamp, result); - } catch (ServiceNotResponseException ex) { - LOGGER.warn("Catch a ServiceNotResponseException with url: {}", url); - // reSelect url + InputStream is = URLResourceFetcher.openURLStream(PathResolverHelper.buildUrlPath(url, YARN_NODES_URL)); + if (!isSuccess) { + isSuccess = doParse(timestamp, is, result); + } + } catch (IOException e) { + inActiveHosts++; + LOGGER.warn(e.getMessage(), e); + rmStatus = RESOURCE_MANAGER_INACTIVE_STATUS; + } catch (Exception ex) { + LOGGER.error("fail to parse url {} due to {}, and will cancel this parsing", url, ex.getMessage(), ex); + result.getSlaveNodes().clear(); } + resourceManagerEntity.setStatus(rmStatus); + result.getMasterNodes().add(resourceManagerEntity); } - - double value = result.getMasterNodes().isEmpty() ? 0 : result.getMasterNodes().size() * 1d / rmUrls.length; + double value = (rmUrls.length - inActiveHosts) * 1.0 / rmUrls.length; result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.RESOURCE_MANAGER_ROLE, value, site, timestamp)); doCheckHistoryServer(timestamp, result); @@ -103,51 +115,41 @@ public class MRTopologyEntityParser implements TopologyEntityParser { if (historyServerUrl == null || historyServerUrl.isEmpty()) { return; } - String hsUrl = PathResolverHelper.buildUrlPath(historyServerUrl, YARN_HISTORY_SERVER_URL); - double liveCount = 1; - try { - InputStreamUtils.getInputStream(hsUrl, null, AppConstants.CompressionType.NONE); - } catch (ConnectException e) { - liveCount = 0; - } catch (Exception e) { - e.printStackTrace(); - return; - } - result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.HISTORY_SERVER_ROLE, liveCount, site, updateTime)); - } - - private InputStream getInputStream(String url, AppConstants.CompressionType type) throws ServiceNotResponseException { + String url = PathResolverHelper.buildUrlPath(historyServerUrl, YARN_HISTORY_SERVER_URL); + double activeHosts = 0; InputStream is = null; try { - is = InputStreamUtils.getInputStream(url, null, type); - } catch (ConnectException e) { - throw new ServiceNotResponseException(e); - } catch (Exception e) { - e.printStackTrace(); + is = URLResourceFetcher.openURLStream(url); + activeHosts++; + } catch (ServiceNotResponseException e) { + LOGGER.error(e.getMessage(), e); + } finally { + URLResourceFetcher.closeInputStream(is); } - return is; + result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.HISTORY_SERVER_ROLE, activeHosts, site, updateTime)); } - private void doParse(String url, long timestamp, TopologyEntityParserResult result) throws ServiceNotResponseException { - - InputStream is = null; + private boolean doParse(long timestamp, InputStream is, TopologyEntityParserResult result) throws IOException { + boolean isSuccess = false; + String nodeKey; + Map<String, MRServiceTopologyAPIEntity> nmMap = new HashMap<>(); try { - LOGGER.info("Going to query URL: " + url); - is = InputStreamUtils.getInputStream(url, null, AppConstants.CompressionType.NONE); YarnNodeInfoWrapper nodeWrapper = OBJ_MAPPER.readValue(is, YarnNodeInfoWrapper.class); - if (nodeWrapper.getNodes() == null || nodeWrapper.getNodes().getNode() == null) { - throw new ServiceNotResponseException("Invalid result of URL: " + url); - } int runningNodeCount = 0; int lostNodeCount = 0; int unhealthyNodeCount = 0; + int rackWarningCount = 0; final List<YarnNodeInfo> list = nodeWrapper.getNodes().getNode(); for (YarnNodeInfo info : list) { final MRServiceTopologyAPIEntity nodeManagerEntity = createEntity(NODE_MANAGER_ROLE, info.getNodeHostName(), timestamp); + if (!extractRack(info).equalsIgnoreCase(nodeManagerEntity.getTags().get(RACK_TAG)) && rackWarningCount < 10) { + LOGGER.warn("rack info is inconsistent, please configure the right rack resolver class"); + rackWarningCount++; + } + nodeManagerEntity.setLastHealthUpdate(info.getLastHealthUpdate()); if (info.getHealthReport() != null && (!info.getHealthReport().isEmpty())) { nodeManagerEntity.setHealthReport(info.getHealthReport()); } - // TODO: Need to remove the manually mapping RUNNING -> running, LOST - > lost, UNHEALTHY -> unhealthy if (info.getState() != null) { final String state = info.getState().toLowerCase(); nodeManagerEntity.setStatus(state); @@ -159,30 +161,23 @@ public class MRTopologyEntityParser implements TopologyEntityParser { ++unhealthyNodeCount; } } - result.getSlaveNodes().add(nodeManagerEntity); + nodeKey = generateKey(nodeManagerEntity); + if (nmMap.containsKey(nodeKey) && nmMap.get(nodeKey).getLastUpdateTime() < nodeManagerEntity.getLastHealthUpdate()) { + nmMap.put(nodeKey, nodeManagerEntity); + } else { + nmMap.put(nodeKey, nodeManagerEntity); + } } - LOGGER.info("Running NMs: " + runningNodeCount + ", lost NMs: " + lostNodeCount + ", unhealthy NMs: " + unhealthyNodeCount); - final MRServiceTopologyAPIEntity resourceManagerEntity = createEntity(TopologyConstants.RESOURCE_MANAGER_ROLE, extractMasterHost(url), timestamp); - resourceManagerEntity.setStatus(TopologyConstants.RESOURCE_MANAGER_ACTIVE_STATUS); - result.getMasterNodes().add(resourceManagerEntity); - double value = runningNodeCount * 1d / list.size(); + LOGGER.info("Total NMs: {}, Actual NMs: {}, Running NMs: {}, lost NMs: {}, unhealthy NMs: {}", list.size(), nmMap.size(), runningNodeCount, lostNodeCount, unhealthyNodeCount); + + double value = runningNodeCount * 1d / nmMap.size(); result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.NODE_MANAGER_ROLE, value, site, timestamp)); - } catch (RuntimeException e) { - e.printStackTrace(); - } catch (IOException e) { - throw new ServiceNotResponseException(e); - } catch (Exception e) { - e.printStackTrace(); + result.getSlaveNodes().addAll(nmMap.values()); + isSuccess = true; } finally { - if (is != null) { - try { - is.close(); - } catch (IOException e) { - e.printStackTrace(); - // Do nothing - } - } + URLResourceFetcher.closeInputStream(is); } + return isSuccess; } private String extractMasterHost(String url) { @@ -194,14 +189,15 @@ public class MRTopologyEntityParser implements TopologyEntityParser { } private String extractRack(YarnNodeInfo info) { - if (info.getRack() == null) { - return null; + if (info.getRack() == null) { // if a host is DECOMMISSIONED, then no rack info + return rackResolver.resolve(info.getNodeHostName()); } String value = info.getRack(); value = value.substring(value.lastIndexOf('/') + 1); return value; } + private MRServiceTopologyAPIEntity createEntity(String roleType, String hostname, long updateTime) { MRServiceTopologyAPIEntity entity = new MRServiceTopologyAPIEntity(); entity.setTimestamp(updateTime); @@ -211,8 +207,8 @@ public class MRTopologyEntityParser implements TopologyEntityParser { tags.put(SITE_TAG, site); tags.put(ROLE_TAG, roleType); tags.put(HOSTNAME_TAG, hostname); - String rack = rackResolver.resolve(hostname); - tags.put(RACK_TAG, rack); + String resolvedRack = rackResolver.resolve(hostname); + tags.put(RACK_TAG, resolvedRack); return entity; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfo.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfo.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfo.java index 4ca2b06..d3664e2 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfo.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfo.java @@ -30,7 +30,7 @@ public class YarnNodeInfo { private String id; private String nodeHostName; private String nodeHTTPAddress; - private String lastHealthUpdate; + private long lastHealthUpdate; private String healthReport; private String numContainers; private String usedMemoryMB; @@ -77,11 +77,11 @@ public class YarnNodeInfo { this.nodeHTTPAddress = nodeHTTPAddress; } - public String getLastHealthUpdate() { + public long getLastHealthUpdate() { return lastHealthUpdate; } - public void setLastHealthUpdate(String lastHealthUpdate) { + public void setLastHealthUpdate(long lastHealthUpdate) { this.lastHealthUpdate = lastHealthUpdate; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java index 821de3c..99a44a6 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java @@ -19,6 +19,7 @@ package org.apache.eagle.topology.resolver.impl; import org.apache.eagle.topology.resolver.TopologyRackResolver; +import org.slf4j.Logger; import java.net.InetAddress; import java.net.UnknownHostException; @@ -28,6 +29,8 @@ import java.net.UnknownHostException; */ public class IPMaskTopologyRackResolver implements TopologyRackResolver { + private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(IPMaskTopologyRackResolver.class); + private final int pos = 2; private int rackPos; @@ -46,7 +49,7 @@ public class IPMaskTopologyRackResolver implements TopologyRackResolver { InetAddress address = InetAddress.getByName(hostname); result = "rack" + (int) (address.getAddress()[rackPos] & 0xff); } catch (UnknownHostException e) { - //e.printStackTrace(); + LOG.error(e.getMessage(), e); } return result; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java index 11d907b..627ebe3 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java @@ -32,10 +32,10 @@ import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; +import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; +import org.apache.eagle.log.entity.GenericMetricEntity; import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; -import org.apache.eagle.service.client.EagleServiceClientException; -import org.apache.eagle.service.client.EagleServiceConnector; import org.apache.eagle.service.client.IEagleServiceClient; import org.apache.eagle.service.client.impl.EagleServiceClientImpl; import org.apache.eagle.topology.TopologyCheckAppConfig; @@ -49,7 +49,6 @@ import org.apache.eagle.topology.entity.TopologyBaseAPIEntity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.*; public class TopologyDataPersistBolt extends BaseRichBolt { @@ -66,8 +65,7 @@ public class TopologyDataPersistBolt extends BaseRichBolt { @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - this.client = new EagleServiceClientImpl(new EagleServiceConnector(this.config.getConfig().getString("service.host"), this.config.getConfig().getInt("service.port"), - this.config.getConfig().getString("service.username"), this.config.getConfig().getString("service.password"))); + this.client = new EagleServiceClientImpl(config.getConfig()); this.collector = collector; } @@ -82,22 +80,20 @@ public class TopologyDataPersistBolt extends BaseRichBolt { List<TopologyBaseAPIEntity> entitiesForDeletion = new ArrayList<>(); List<TopologyBaseAPIEntity> entitiesToWrite = new ArrayList<>(); - filterEntitiesToWrite(entitiesToWrite, availableHostnames, result.getMasterNodes()); - filterEntitiesToWrite(entitiesToWrite, availableHostnames, result.getSlaveNodes()); + filterEntitiesToWrite(result, availableHostnames, entitiesToWrite); String query = String.format("%s[@site=\"%s\"]{*}", serviceName, this.config.dataExtractorConfig.site); try { GenericServiceAPIResponseEntity<TopologyBaseAPIEntity> response = client.search().query(query).pageSize(Integer.MAX_VALUE).send(); if (response.isSuccess() && response.getObj() != null) { for (TopologyBaseAPIEntity entity : response.getObj()) { - if (availableHostnames != null && availableHostnames.size() > 0 && !availableHostnames.contains(generateKey(entity))) { + if (result.getSlaveNodes().size() > 0 && !availableHostnames.contains(generateKey(entity))) { entitiesForDeletion.add(entity); } } } deleteEntities(entitiesForDeletion, serviceName); - writeEntities(entitiesToWrite, serviceName); - writeEntities(result.getMetrics(), serviceName); + writeEntities(entitiesToWrite, result.getMetrics(), serviceName); emitToKafkaBolt(result); this.collector.ack(input); } catch (Exception e) { @@ -106,8 +102,12 @@ public class TopologyDataPersistBolt extends BaseRichBolt { } } - private void filterEntitiesToWrite(List<TopologyBaseAPIEntity> entitiesToWrite, Set<String> availableHostnames, List<TopologyBaseAPIEntity> entities) { - for (TopologyBaseAPIEntity entity : entities) { + private void filterEntitiesToWrite(TopologyEntityParserResult result, Set<String> availableHostnames, List<TopologyBaseAPIEntity> entitiesToWrite) { + for (TopologyBaseAPIEntity entity : result.getMasterNodes()) { + availableHostnames.add(generateKey(entity)); + entitiesToWrite.add(entity); + } + for (TopologyBaseAPIEntity entity : result.getSlaveNodes()) { availableHostnames.add(generateKey(entity)); entitiesToWrite.add(entity); } @@ -126,15 +126,13 @@ public class TopologyDataPersistBolt extends BaseRichBolt { } else { LOG.info("Successfully delete {} entities for {}", entities.size(), serviceName); } - } catch (EagleServiceClientException e) { - LOG.error(e.getMessage(), e); - } catch (IOException e) { + } catch (Exception e) { LOG.error(e.getMessage(), e); } entities.clear(); } - private void writeEntities(List<? extends TaggedLogAPIEntity> entities, String serviceName) { + private void writeEntities(List<? extends TaggedLogAPIEntity> entities, List<GenericMetricEntity> metrics, String serviceName) { try { GenericServiceAPIResponseEntity response = client.create(entities); if (!response.isSuccess()) { @@ -142,6 +140,12 @@ public class TopologyDataPersistBolt extends BaseRichBolt { } else { LOG.info("Successfully wrote {} entities for {}", entities.size(), serviceName); } + response = client.create(metrics); + if (!response.isSuccess()) { + LOG.error("Got exception from eagle service: " + response.getException()); + } else { + LOG.info("Successfully wrote {} metrics for {}", entities.size(), serviceName); + } } catch (Exception e) { LOG.error("cannot create entities successfully", e); } @@ -149,23 +153,19 @@ public class TopologyDataPersistBolt extends BaseRichBolt { } private String generateKey(TopologyBaseAPIEntity entity) { - return String.format("%s-%s-%s-%s", entity.getTags().get(TopologyConstants.SITE_TAG), - entity.getTags().get(TopologyConstants.RACK_TAG), entity.getTags().get(TopologyConstants.HOSTNAME_TAG), - entity.getTags().get(TopologyConstants.ROLE_TAG)); + return new HashCodeBuilder().append(entity.getTags().get(TopologyConstants.SITE_TAG)) + .append(entity.getTags().get(TopologyConstants.HOSTNAME_TAG)) + .append(entity.getTags().get(TopologyConstants.ROLE_TAG)) + .build().toString(); } private void emitToKafkaBolt(TopologyEntityParserResult result) { - List<HealthCheckParseAPIEntity> healthCheckParseAPIList = new ArrayList<HealthCheckParseAPIEntity>(); - setNodeInfo(result.getMasterNodes(), healthCheckParseAPIList); - setNodeInfo(result.getSlaveNodes(), healthCheckParseAPIList); - for (HealthCheckParseAPIEntity healthCheckAPIEntity : healthCheckParseAPIList) { this.collector.emit(new Values(healthCheckAPIEntity)); } - } private void setNodeInfo(List<TopologyBaseAPIEntity> topologyBaseAPIList, List<HealthCheckParseAPIEntity> healthCheckParseAPIList) { @@ -176,17 +176,12 @@ public class TopologyDataPersistBolt extends BaseRichBolt { TopologyBaseAPIEntity topologyBaseAPIEntity = iterator.next(); if (topologyBaseAPIEntity instanceof HBaseServiceTopologyAPIEntity) { - healthCheckAPIEntity.setStatus(((HBaseServiceTopologyAPIEntity) topologyBaseAPIEntity).getStatus()); - } if (topologyBaseAPIEntity instanceof HdfsServiceTopologyAPIEntity) { - healthCheckAPIEntity.setStatus(((HdfsServiceTopologyAPIEntity) topologyBaseAPIEntity).getStatus()); } - if (topologyBaseAPIEntity instanceof MRServiceTopologyAPIEntity) { - healthCheckAPIEntity.setStatus(((MRServiceTopologyAPIEntity) topologyBaseAPIEntity).getStatus()); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/EntityBuilderHelper.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/EntityBuilderHelper.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/EntityBuilderHelper.java index f390fa8..8aa1d88 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/EntityBuilderHelper.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/EntityBuilderHelper.java @@ -19,8 +19,10 @@ package org.apache.eagle.topology.utils; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.eagle.log.entity.GenericMetricEntity; import org.apache.eagle.topology.TopologyConstants; +import org.apache.eagle.topology.entity.TopologyBaseAPIEntity; import java.net.InetAddress; import java.net.UnknownHostException; @@ -39,7 +41,7 @@ public class EntityBuilderHelper { return addr.getHostName(); } - public static GenericMetricEntity metricWrapper(Long timestamp, String metricName, double value, Map<String, String> tags) { + private static GenericMetricEntity metricWrapper(Long timestamp, String metricName, double value, Map<String, String> tags) { GenericMetricEntity metricEntity = new GenericMetricEntity(); metricEntity.setTimestamp(timestamp); metricEntity.setTags(tags); @@ -63,4 +65,11 @@ public class EntityBuilderHelper { return key.indexOf(TopologyConstants.COLON) > 0 ? key.substring(0, key.indexOf(TopologyConstants.COLON)) : key; } + public static String generateKey(TopologyBaseAPIEntity entity) { + return new HashCodeBuilder().append(entity.getTags().get(TopologyConstants.SITE_TAG)) + .append(entity.getTags().get(TopologyConstants.HOSTNAME_TAG)) + .append(entity.getTags().get(TopologyConstants.ROLE_TAG)) + .build().toString(); + } + } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXQueryHelper.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXQueryHelper.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXQueryHelper.java index bca8485..860a1b8 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXQueryHelper.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXQueryHelper.java @@ -18,8 +18,10 @@ package org.apache.eagle.topology.utils; +import org.apache.eagle.app.utils.connection.ServiceNotResponseException; import org.apache.eagle.app.utils.connection.URLConnectionUtils; +import org.apache.eagle.app.utils.connection.URLResourceFetcher; import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; @@ -38,28 +40,19 @@ import java.util.Map; */ public final class JMXQueryHelper { - private static final int DEFAULT_QUERY_TIMEOUT = 30 * 60 * 1000; - private static final Logger LOG = LoggerFactory.getLogger(JMXQueryHelper.class); - - public static Map<String, JMXBean> query(String jmxQueryUrl) throws JSONException, IOException { - LOG.info("Going to query JMX url: " + jmxQueryUrl); + public static Map<String, JMXBean> query(String jmxQueryUrl) throws ServiceNotResponseException { InputStream is = null; try { - final URLConnection connection = URLConnectionUtils.getConnection(jmxQueryUrl); - connection.setReadTimeout(DEFAULT_QUERY_TIMEOUT); - is = connection.getInputStream(); + is = URLResourceFetcher.openURLStream(jmxQueryUrl); return parseStream(is); } catch (Exception e) { - e.printStackTrace(); - return null; + throw new ServiceNotResponseException(e); } finally { - if (is != null) { - is.close(); - } + URLResourceFetcher.closeInputStream(is); } } - public static Map<String, JMXBean> parseStream(InputStream is) { + private static Map<String, JMXBean> parseStream(InputStream is) { final Map<String, JMXBean> resultMap = new HashMap<String, JMXBean>(); final JSONTokener tokener = new JSONTokener(is); final JSONObject jsonBeansObject = new JSONObject(tokener); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/ServiceNotResponseException.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/ServiceNotResponseException.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/ServiceNotResponseException.java deleted file mode 100644 index 3204ca3..0000000 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/ServiceNotResponseException.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.topology.utils; - -import java.io.IOException; - -public class ServiceNotResponseException extends IOException { - - private static final long serialVersionUID = -2425311876734366496L; - - /** - * Default constructor of FeederException. - */ - public ServiceNotResponseException() { - super(); - } - - /** - * Constructor of FeederException. - * - * @param message error message - */ - public ServiceNotResponseException(String message) { - super(message); - } - - /** - * Constructor of FeederException. - * - * @param message error message - * @param cause the cause of the exception - */ - public ServiceNotResponseException(String message, Throwable cause) { - super(message, cause); - } - - /** - * Constructor of FeederException. - * - * @param cause the cause of the exception - */ - public ServiceNotResponseException(Throwable cause) { - super(cause); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/StringUtils.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/StringUtils.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/StringUtils.java deleted file mode 100644 index 3b47f84..0000000 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/StringUtils.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.topology.utils; - -import java.util.Iterator; -import java.util.Map; -import java.util.Map.Entry; - -public final class StringUtils { - - public static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"; - - private StringUtils() { - - } - - public static String convertMapToString(Map<String, String> tags) { - StringBuilder tagBuilder = new StringBuilder(); - Iterator<Entry<String, String>> iter = tags.entrySet().iterator(); - while (iter.hasNext()) { - Map.Entry<String, String> entry = (Map.Entry<String, String>) iter.next(); - tagBuilder.append(entry.getKey() + ":" + entry.getValue()); - if (iter.hasNext()) { - tagBuilder.append(","); - } - } - return tagBuilder.toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/TopologyConstants.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/TopologyConstants.java b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/TopologyConstants.java index 3e535a8..488d3f9 100644 --- a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/TopologyConstants.java +++ b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/TopologyConstants.java @@ -61,6 +61,7 @@ public class TopologyConstants { // Status definitions for resource manager public static final String RESOURCE_MANAGER_ACTIVE_STATUS = "active"; + public static final String RESOURCE_MANAGER_INACTIVE_STATUS = "inactive"; // Status definitions for node manager public static final String NODE_MANAGER_RUNNING_STATUS = "running"; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/MRServiceTopologyAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/MRServiceTopologyAPIEntity.java b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/MRServiceTopologyAPIEntity.java index 82e57fd..006c898 100644 --- a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/MRServiceTopologyAPIEntity.java +++ b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/MRServiceTopologyAPIEntity.java @@ -38,8 +38,19 @@ public class MRServiceTopologyAPIEntity extends TopologyBaseAPIEntity { @Column("d") private String healthReport; @Column("e") + private long lastHealthUpdate; + @Column("f") private long lastUpdateTime; + public long getLastHealthUpdate() { + return lastHealthUpdate; + } + + public void setLastHealthUpdate(long lastHealthUpdate) { + this.lastHealthUpdate = lastHealthUpdate; + valueChanged("lastHealthUpdate"); + } + public long getLastUpdateTime() { return lastUpdateTime; }