Repository: eagle Updated Branches: refs/heads/master 0a4299766 -> 71c1f95a4
[EAGLE-860] TopologyDataExtractor can't extract right rack info - Use clusternode API to get rack info. https://issues.apache.org/jira/browse/EAGLE-860 Author: r7raul1984 <[email protected]> Closes #772 from r7raul1984/EAGLE-860. Project: http://git-wip-us.apache.org/repos/asf/eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/71c1f95a Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/71c1f95a Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/71c1f95a Branch: refs/heads/master Commit: 71c1f95a46fef70bff42cf0b1db56cfc3f6b1d3c Parents: 0a42997 Author: r7raul1984 <[email protected]> Authored: Fri Jan 13 17:15:17 2017 +0800 Committer: Hao Chen <[email protected]> Committed: Fri Jan 13 17:15:17 2017 +0800 ---------------------------------------------------------------------- eagle-topology-check/eagle-topology-app/pom.xml | 13 +++- .../eagle/topology/TopologyCheckAppConfig.java | 2 + .../ClusterNodeAPITopologyRackResolver.java | 72 ++++++++++++++++++++ .../eagle/topology/resolver/model/Node.java | 36 ++++++++++ .../eagle/topology/resolver/model/NodeInfo.java | 57 ++++++++++++++++ .../topology/storm/TopologyDataExtractor.java | 14 ++-- ....eagle.topology.TopologyCheckAppProvider.xml | 7 +- .../TestClusterNodeAPITopologyRackResolver.java | 72 ++++++++++++++++++++ .../src/test/resources/nodeinfo.json | 17 +++++ 9 files changed, 284 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/71c1f95a/eagle-topology-check/eagle-topology-app/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/pom.xml b/eagle-topology-check/eagle-topology-app/pom.xml index 913003d..dc74b9f 100644 --- a/eagle-topology-check/eagle-topology-app/pom.xml +++ b/eagle-topology-check/eagle-topology-app/pom.xml @@ -50,7 +50,18 @@ <groupId>org.json</groupId> <artifactId>json</artifactId> </dependency> - + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-module-junit4</artifactId> + <version>${powermock.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-api-mockito</artifactId> + <version>${powermock.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/eagle/blob/71c1f95a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java index 90a3773..a75a3b3 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java @@ -77,6 +77,7 @@ public class TopologyCheckAppConfig implements Serializable { this.dataExtractorConfig.numDataFetcherSpout = config.getInt("topology.numDataFetcherSpout"); this.dataExtractorConfig.numEntityPersistBolt = config.getInt("topology.numEntityPersistBolt"); this.dataExtractorConfig.numKafkaSinkBolt = config.getInt("topology.numOfKafkaSinkBolt"); + this.dataExtractorConfig.resolverAPIUrl = config.getString("topology.resolverAPIUrl"); String resolveCls = config.getString("topology.rackResolverCls"); try { this.dataExtractorConfig.resolverCls = (Class<? extends TopologyRackResolver>) Class.forName(resolveCls); @@ -120,6 +121,7 @@ public class TopologyCheckAppConfig implements Serializable { public int numKafkaSinkBolt; public long fetchDataIntervalInSecs; public int parseThreadPoolSize; + public String resolverAPIUrl; public Class<? extends TopologyRackResolver> resolverCls; } http://git-wip-us.apache.org/repos/asf/eagle/blob/71c1f95a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/ClusterNodeAPITopologyRackResolver.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/ClusterNodeAPITopologyRackResolver.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/ClusterNodeAPITopologyRackResolver.java new file mode 100644 index 0000000..8e153f0 --- /dev/null +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/ClusterNodeAPITopologyRackResolver.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.eagle.topology.resolver.impl; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.eagle.app.utils.AppConstants; +import org.apache.eagle.app.utils.connection.InputStreamUtils; +import org.apache.eagle.topology.resolver.TopologyRackResolver; +import org.apache.eagle.topology.resolver.model.Node; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.InputStream; + +/** + * resolve rack by ClusterNode API. + * https://hadoop.apache.org/docs/r2.6.0/hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html + */ +public class ClusterNodeAPITopologyRackResolver implements TopologyRackResolver { + + private static final Logger LOG = LoggerFactory.getLogger(ClusterNodeAPITopologyRackResolver.class); + private static final String DEFAULT_RACK_NAME = "/default-rack"; + private String activeApiUrl = ""; + private String hostPort = "8041";//TODO configurable + private static final ObjectMapper OBJ_MAPPER = new ObjectMapper(); + + public ClusterNodeAPITopologyRackResolver(String activeApiUrl) { + this.activeApiUrl = activeApiUrl; + } + + @Override + public String resolve(String hostname) { + String nodeid = hostname + ":" + hostPort; + String requestUrl = activeApiUrl + "/" + nodeid; + String rack = DEFAULT_RACK_NAME; + InputStream is = null; + try { + is = InputStreamUtils.getInputStream(requestUrl, null, AppConstants.CompressionType.NONE); + LOG.info("resolve rack by api url {}", requestUrl); + Node node = OBJ_MAPPER.readValue(is, Node.class); + rack = node.getNode().getRack(); + } catch (Exception e) { + LOG.warn("resolve rack by api url {} failed, {}", requestUrl, e); + return rack; + } finally { + if (is != null) { + try { + is.close(); + } catch (Exception e) { + LOG.warn("{}", e); + } + } + } + return rack; + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/71c1f95a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/model/Node.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/model/Node.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/model/Node.java new file mode 100644 index 0000000..ac9460a --- /dev/null +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/model/Node.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.eagle.topology.resolver.model; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +public class Node { + private NodeInfo node; + + public NodeInfo getNode() { + return node; + } + + public void setNode(NodeInfo node) { + this.node = node; + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/71c1f95a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/model/NodeInfo.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/model/NodeInfo.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/model/NodeInfo.java new file mode 100644 index 0000000..4a47b8a --- /dev/null +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/model/NodeInfo.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.eagle.topology.resolver.model; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +public class NodeInfo { + + private String id; + private String rack; + private String nodeHostName; + + public String getRack() { + return rack; + } + + public void setRack(String rack) { + this.rack = rack; + } + + public String getNodeHostName() { + return nodeHostName; + } + + public void setNodeHostName(String nodeHostName) { + this.nodeHostName = nodeHostName; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + +} http://git-wip-us.apache.org/repos/asf/eagle/blob/71c1f95a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataExtractor.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataExtractor.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataExtractor.java index 32eae9b..492cf3f 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataExtractor.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataExtractor.java @@ -23,10 +23,13 @@ import org.apache.eagle.topology.TopologyCheckAppConfig; import org.apache.eagle.topology.extractor.TopologyCrawler; import org.apache.eagle.topology.extractor.TopologyExtractorFactory; import org.apache.eagle.topology.resolver.TopologyRackResolver; +import org.apache.eagle.topology.resolver.impl.ClusterNodeAPITopologyRackResolver; import org.apache.eagle.topology.resolver.impl.DefaultTopologyRackResolver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; @@ -74,10 +77,13 @@ public class TopologyDataExtractor { TopologyRackResolver rackResolver = new DefaultTopologyRackResolver(); if (config.dataExtractorConfig.resolverCls != null) { try { - rackResolver = config.dataExtractorConfig.resolverCls.newInstance(); - } catch (InstantiationException e) { - e.printStackTrace(); - } catch (IllegalAccessException e) { + if (config.dataExtractorConfig.resolverCls == ClusterNodeAPITopologyRackResolver.class) { + Constructor ctor = config.dataExtractorConfig.resolverCls.getConstructor(String.class); + rackResolver = (ClusterNodeAPITopologyRackResolver) ctor.newInstance(config.dataExtractorConfig.resolverAPIUrl); + } else { + rackResolver = config.dataExtractorConfig.resolverCls.newInstance(); + } + } catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { e.printStackTrace(); } } http://git-wip-us.apache.org/repos/asf/eagle/blob/71c1f95a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml index bfe43ed..87d3202 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml +++ b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml @@ -58,7 +58,12 @@ <value>2</value> <description>number of sinks connected to alert engine</description> </property> - + <property> + <name>topology.resolverAPIUrl</name> + <displayName>Rack Resolver APIUrl</displayName> + <description>Use the URL to obtain a Node Object, from a node identified by the nodeid value.</description> + <value>http://sandbox.hortonworks.com:8088/ws/v1/cluster/nodes</value> + </property> <property> <name>topology.rackResolverCls</name> <displayName>Rack Resolver Class</displayName> http://git-wip-us.apache.org/repos/asf/eagle/blob/71c1f95a/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestClusterNodeAPITopologyRackResolver.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestClusterNodeAPITopologyRackResolver.java b/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestClusterNodeAPITopologyRackResolver.java new file mode 100644 index 0000000..cb53c1a --- /dev/null +++ b/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestClusterNodeAPITopologyRackResolver.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.eagle.topology; + +import org.apache.eagle.app.utils.connection.InputStreamUtils; +import org.apache.eagle.topology.resolver.TopologyRackResolver; +import org.apache.eagle.topology.resolver.impl.ClusterNodeAPITopologyRackResolver; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.io.InputStream; +import java.lang.reflect.Constructor; + +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.when; +import static org.powermock.api.mockito.PowerMockito.mockStatic; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(InputStreamUtils.class) +public class TestClusterNodeAPITopologyRackResolver { + @Test + public void testClusterNodeAPITopologyRackResolver() throws Exception { + mockStatic(InputStreamUtils.class); + String apiUrl = "http://yhd-jqhadoop168.int.yihaodian.com:8088/ws/v1/cluster/nodes"; + String hostname = "hostname"; + mockInputSteam("/nodeinfo.json", apiUrl + "/" + hostname + ":8041"); + + Class<? extends TopologyRackResolver> resolverCls = (Class<? extends TopologyRackResolver>) Class.forName("org.apache.eagle.topology.resolver.impl.ClusterNodeAPITopologyRackResolver"); + Assert.assertTrue(resolverCls == ClusterNodeAPITopologyRackResolver.class); + Constructor ctor = resolverCls.getConstructor(String.class); + TopologyRackResolver topologyRackResolver = (TopologyRackResolver) ctor.newInstance(apiUrl); + Assert.assertEquals("/rowb/rack12", topologyRackResolver.resolve(hostname)); + } + + @Test + public void testClusterNodeAPITopologyRackResolver1() throws Exception { + mockStatic(InputStreamUtils.class); + String apiUrl = "http://yhd-jqhadoop168.int.yihaodian.com:8088/ws/v1/cluster/nodes"; + String hostname = "hostname"; + mockInputSteamWithException(apiUrl + "/" + hostname + ":8041"); + TopologyRackResolver topologyRackResolver = new ClusterNodeAPITopologyRackResolver(apiUrl); + Assert.assertEquals("/default-rack", topologyRackResolver.resolve(hostname)); + } + + private void mockInputSteam(String mockDataFilePath, String url) throws Exception { + InputStream jsonstream = this.getClass().getResourceAsStream(mockDataFilePath); + when(InputStreamUtils.getInputStream(eq(url), anyObject(), anyObject())).thenReturn(jsonstream); + } + + private void mockInputSteamWithException(String url) throws Exception { + when(InputStreamUtils.getInputStream(eq(url), anyObject(), anyObject())).thenThrow(new Exception()); + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/71c1f95a/eagle-topology-check/eagle-topology-app/src/test/resources/nodeinfo.json ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/test/resources/nodeinfo.json b/eagle-topology-check/eagle-topology-app/src/test/resources/nodeinfo.json new file mode 100644 index 0000000..ac87391 --- /dev/null +++ b/eagle-topology-check/eagle-topology-app/src/test/resources/nodeinfo.json @@ -0,0 +1,17 @@ +{ + "node": { + "rack": "/rowb/rack12", + "state": "RUNNING", + "id": "hostname:8041", + "nodeHostName": "hostname", + "nodeHTTPAddress": "hostname:8042", + "lastHealthUpdate": 1484196671092, + "version": "2.6.0", + "healthReport": "", + "numContainers": 2, + "usedMemoryMB": 6144, + "availMemoryMB": 43008, + "usedVirtualCores": 2, + "availableVirtualCores": 22 + } +} \ No newline at end of file
