Repository: eagle Updated Branches: refs/heads/master 0c52711b8 -> 29b797614
[EAGLE-989] Fix a bug in Resource Manager HA checker https://issues.apache.org/jira/browse/EAGLE-989 Author: Qingwen Zhao <[email protected]> Author: Zhao, Qingwen <[email protected]> Closes #900 from qingwen220/EAGLE-989. Project: http://git-wip-us.apache.org/repos/asf/eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/29b79761 Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/29b79761 Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/29b79761 Branch: refs/heads/master Commit: 29b79761477311367cf42e2bf2f213c0d3a08ab1 Parents: 0c52711 Author: Qingwen Zhao <[email protected]> Authored: Wed Apr 5 14:38:53 2017 +0800 Committer: Zhao, Qingwen <[email protected]> Committed: Wed Apr 5 14:38:53 2017 +0800 ---------------------------------------------------------------------- .../jpm/mr/running/MRRunningJobApplicationTest.java | 4 +++- .../src/test/resources/clusterinfo.json | 15 +++++++++++++++ .../jpm/util/resourcefetch/ha/HAURLSelectorImpl.java | 5 +++++ .../util/resourcefetch/RMResourceFetcherTest.java | 8 ++++++++ .../util/resourcefetch/ha/HAURLSelectorImplTest.java | 10 +++++----- 5 files changed, 36 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/29b79761/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java index faf8c8e..c9e4998 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java @@ -23,7 +23,6 @@ import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; -import org.apache.eagle.jpm.analyzer.mr.MRJobPerformanceAnalyzer; import org.apache.eagle.jpm.mr.running.parser.MRJobParser; import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager; import org.apache.eagle.jpm.mr.running.storm.MRRunningJobFetchSpout; @@ -53,6 +52,7 @@ import java.util.concurrent.Executors; import static org.mockito.Matchers.any; import static org.mockito.Mockito.*; import static org.powermock.api.mockito.PowerMockito.mockStatic; +import static org.powermock.api.mockito.PowerMockito.when; @RunWith(PowerMockRunner.class) @PrepareForTest({InputStreamUtils.class, MRRunningJobFetchSpout.class, Executors.class, MRRunningJobParseBolt.class}) @@ -255,6 +255,8 @@ public class MRRunningJobApplicationTest { InputStream jsonstream = this.getClass().getResourceAsStream(mockDataFilePath); mockStatic(InputStreamUtils.class); when(InputStreamUtils.getInputStream(RM_URL, null, Constants.CompressionType.GZIP)).thenReturn(jsonstream); + InputStream clusterInfoStream = this.getClass().getResourceAsStream("/clusterinfo.json"); + when(InputStreamUtils.getInputStream("http://sandbox.hortonworks.com:50030/ws/v1/cluster?anonymous=true", null, Constants.CompressionType.NONE)).thenReturn(clusterInfoStream); } } http://git-wip-us.apache.org/repos/asf/eagle/blob/29b79761/eagle-jpm/eagle-jpm-mr-running/src/test/resources/clusterinfo.json ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/resources/clusterinfo.json b/eagle-jpm/eagle-jpm-mr-running/src/test/resources/clusterinfo.json new file mode 100644 index 0000000..ae65760 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-running/src/test/resources/clusterinfo.json @@ -0,0 +1,15 @@ +{ + "clusterInfo": { + "id": 1324053971963, + "startedOn": 1324053971963, + "state": "STARTED", + "haState": "ACTIVE", + "rmStateStoreName": "org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore", + "resourceManagerVersion": "0.23.1-SNAPSHOT", + "resourceManagerBuildVersion": "0.23.1-SNAPSHOT from 1214049 by user1 source checksum 050cd664439d931c8743a6428fd6a693", + "resourceManagerVersionBuiltOn": "Tue Dec 13 22:12:48 CST 2011", + "hadoopVersion": "0.23.1-SNAPSHOT", + "hadoopBuildVersion": "0.23.1-SNAPSHOT from 1214049 by user1 source checksum 11458df3bb77342dca5f917198fad328", + "hadoopVersionBuiltOn": "Tue Dec 13 22:12:26 CST 2011" + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/29b79761/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImpl.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImpl.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImpl.java index 1f49cc1..a4ff583 100644 --- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImpl.java +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImpl.java @@ -19,6 +19,7 @@ package org.apache.eagle.jpm.util.resourcefetch.ha; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.eagle.jpm.util.Constants; import org.apache.eagle.jpm.util.resourcefetch.connection.InputStreamUtils; +import org.apache.eagle.jpm.util.resourcefetch.model.ClusterInfoWrapper; import org.apache.eagle.jpm.util.resourcefetch.url.RmActiveTestURLBuilderImpl; import org.apache.eagle.jpm.util.resourcefetch.url.ServiceURLBuilder; import org.apache.hadoop.util.StringUtils; @@ -58,6 +59,10 @@ public class HAURLSelectorImpl implements HAURLSelector { try { LOG.info("checking resource manager HA by {}", urlString); is = InputStreamUtils.getInputStream(urlString, null, compressionType); + if (null == is) { + return false; + } + OBJ_MAPPER.readValue(is, ClusterInfoWrapper.class); } catch (Exception ex) { LOG.info("fail to get inputStream from {}", urlString); return false; http://git-wip-us.apache.org/repos/asf/eagle/blob/29b79761/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcherTest.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcherTest.java b/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcherTest.java index 29429f2..16ad175 100644 --- a/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcherTest.java +++ b/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcherTest.java @@ -19,6 +19,7 @@ package org.apache.eagle.jpm.util.resourcefetch; import org.apache.eagle.jpm.util.Constants; import org.apache.eagle.jpm.util.resourcefetch.connection.InputStreamUtils; +import org.apache.eagle.jpm.util.resourcefetch.ha.HAURLSelectorImpl; import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo; import org.apache.eagle.jpm.util.resourcefetch.model.ClusterInfo; import org.junit.Assert; @@ -36,12 +37,15 @@ import static org.powermock.api.mockito.PowerMockito.mockStatic; @RunWith(PowerMockRunner.class) @PrepareForTest(InputStreamUtils.class) public class RMResourceFetcherTest { + private InputStream clusterInfoStream = this.getClass().getResourceAsStream("/clusterinfo.json"); + @Test public void testCompleteMrJob() throws Exception { String[] rmBasePaths = new String[]{"http://www.xxx.com:8088", "http://www.yyy.com:8088"}; RMResourceFetcher rmResourceFetcher = new RMResourceFetcher(rmBasePaths); InputStream jsonstream = this.getClass().getResourceAsStream("/mrcompleteapp.json"); mockStatic(InputStreamUtils.class); + when(InputStreamUtils.getInputStream("http://www.xxx.com:8088/ws/v1/cluster?anonymous=true", null, Constants.CompressionType.NONE)).thenReturn(clusterInfoStream); when(InputStreamUtils.getInputStream("http://www.xxx.com:8088/ws/v1/cluster/apps?applicationTypes=MAPREDUCE&state=FINISHED&finishedTimeBegin=1479244718794&anonymous=true", null, Constants.CompressionType.GZIP)).thenReturn(jsonstream); String lastFinishedTime = "1479244718794"; List<AppInfo> appInfos = rmResourceFetcher.getResource(Constants.ResourceType.COMPLETE_MR_JOB, lastFinishedTime); @@ -56,6 +60,7 @@ public class RMResourceFetcherTest { RMResourceFetcher rmResourceFetcher = new RMResourceFetcher(rmBasePaths); InputStream jsonstream = this.getClass().getResourceAsStream("/mrrunningapp.json"); mockStatic(InputStreamUtils.class); + when(InputStreamUtils.getInputStream("http://www.xxx.com:8088/ws/v1/cluster?anonymous=true", null, Constants.CompressionType.NONE)).thenReturn(clusterInfoStream); when(InputStreamUtils.getInputStream("http://www.xxx.com:8088/ws/v1/cluster/apps?applicationTypes=MAPREDUCE&state=RUNNING&anonymous=true", null, Constants.CompressionType.GZIP)).thenReturn(jsonstream); List<AppInfo> appInfos = rmResourceFetcher.getResource(Constants.ResourceType.RUNNING_MR_JOB); Assert.assertEquals(2, appInfos.size()); @@ -70,6 +75,7 @@ public class RMResourceFetcherTest { InputStream jsonstream = this.getClass().getResourceAsStream("/sparkrunningapp.json"); mockStatic(InputStreamUtils.class); when(InputStreamUtils.getInputStream("http://www.xxx.com:8088/ws/v1/cluster/apps?applicationTypes=SPARK&state=RUNNING&anonymous=true", null, Constants.CompressionType.GZIP)).thenReturn(jsonstream); + when(InputStreamUtils.getInputStream("http://www.xxx.com:8088/ws/v1/cluster?anonymous=true", null, Constants.CompressionType.NONE)).thenReturn(clusterInfoStream); List<AppInfo> appInfos = rmResourceFetcher.getResource(Constants.ResourceType.RUNNING_SPARK_JOB); Assert.assertEquals(2, appInfos.size()); @@ -84,6 +90,7 @@ public class RMResourceFetcherTest { InputStream jsonstream = this.getClass().getResourceAsStream("/sparkcompleteapp.json"); mockStatic(InputStreamUtils.class); long finishedTimeBegin = 1479244718794l; + when(InputStreamUtils.getInputStream("http://www.xxx.com:8088/ws/v1/cluster?anonymous=true", null, Constants.CompressionType.NONE)).thenReturn(clusterInfoStream); when(InputStreamUtils.getInputStream("http://www.xxx.com:8088/ws/v1/cluster/apps?applicationTypes=SPARK&state=FINISHED&finishedTimeBegin=1479244718794&anonymous=true", null, Constants.CompressionType.GZIP)).thenReturn(jsonstream); List<AppInfo> appInfos = rmResourceFetcher.getResource(Constants.ResourceType.COMPLETE_SPARK_JOB, String.valueOf(finishedTimeBegin)); Assert.assertEquals(2, appInfos.size()); @@ -98,6 +105,7 @@ public class RMResourceFetcherTest { RMResourceFetcher rmResourceFetcher = new RMResourceFetcher(rmBasePaths); mockStatic(InputStreamUtils.class); InputStream jsonstream = this.getClass().getResourceAsStream("/clusterinfo.json"); + when(InputStreamUtils.getInputStream("http://www.xxx.com:8088/ws/v1/cluster?anonymous=true", null, Constants.CompressionType.NONE)).thenReturn(clusterInfoStream); when(InputStreamUtils.getInputStream("http://www.xxx.com:8088/ws/v1/cluster/info?anonymous=true", null, Constants.CompressionType.GZIP)).thenReturn(jsonstream); ClusterInfo clusterInfo = rmResourceFetcher.getClusterInfo(); Assert.assertEquals("ClusterInfo{id=1324053971963, startedOn=1324053971963, state='STARTED', haState='ACTIVE', resourceManagerVersion='0.23.1-SNAPSHOT', resourceManagerBuildVersion='0.23.1-SNAPSHOT from 1214049 by user1 source checksum 050cd664439d931c8743a6428fd6a693', resourceManagerVersionBuiltOn='Tue Dec 13 22:12:48 CST 2011', hadoopVersion='0.23.1-SNAPSHOT', hadoopBuildVersion='0.23.1-SNAPSHOT from 1214049 by user1 source checksum 11458df3bb77342dca5f917198fad328', hadoopVersionBuiltOn='Tue Dec 13 22:12:26 CST 2011'}", clusterInfo.toString()); http://git-wip-us.apache.org/repos/asf/eagle/blob/29b79761/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImplTest.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImplTest.java b/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImplTest.java index 8d056eb..8487b54 100644 --- a/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImplTest.java +++ b/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImplTest.java @@ -47,7 +47,7 @@ public class HAURLSelectorImplTest { HAURLSelectorImpl haurlSelector = new HAURLSelectorImpl(rmBasePaths, Constants.CompressionType.GZIP); mockStatic(InputStreamUtils.class); when(InputStreamUtils.getInputStream("http://www.xxx.com:8088", null, Constants.CompressionType.GZIP)).thenReturn(null); - Assert.assertTrue(haurlSelector.checkUrl("http://www.xxx.com:8088")); + Assert.assertFalse(haurlSelector.checkUrl("http://www.xxx.com:8088")); } @Test @@ -66,7 +66,7 @@ public class HAURLSelectorImplTest { Assert.assertEquals(rmBasePaths[0], haurlSelector.getSelectedUrl()); } - @Test + @Test(expected = IOException.class) public void testReSelectUrl() throws Exception { String[] rmBasePaths = new String[]{"http://www.xxx.com:8088", "http://www.yyy.com:8088"}; HAURLSelectorImpl haurlSelector = new HAURLSelectorImpl(rmBasePaths, Constants.CompressionType.GZIP); @@ -74,10 +74,10 @@ public class HAURLSelectorImplTest { when(InputStreamUtils.getInputStream("http://www.xxx.com:8088/ws/v1/cluster?anonymous=true", null, Constants.CompressionType.GZIP)).thenThrow(new Exception()); when(InputStreamUtils.getInputStream("http://www.yyy.com:8088/ws/v1/cluster?anonymous=true", null, Constants.CompressionType.GZIP)).thenReturn(null); haurlSelector.checkUrl(); - Assert.assertEquals(rmBasePaths[1], haurlSelector.getSelectedUrl()); + //Assert.assertEquals(rmBasePaths[1], haurlSelector.getSelectedUrl()); } - @Test + @Test(expected = IOException.class) public void testReSelectUrl1() throws Exception { String[] rmBasePaths = new String[]{"http://www.xxx.com:8088", "http://www.yyy.com:8088"}; HAURLSelectorImpl haurlSelector = new HAURLSelectorImpl(rmBasePaths, Constants.CompressionType.GZIP); @@ -85,7 +85,7 @@ public class HAURLSelectorImplTest { when(InputStreamUtils.getInputStream("http://www.xxx.com:8088/ws/v1/cluster?anonymous=true", null, Constants.CompressionType.GZIP)).thenReturn(null); when(InputStreamUtils.getInputStream("http://www.yyy.com:8088/ws/v1/cluster?anonymous=true", null, Constants.CompressionType.GZIP)).thenThrow(new Exception()); haurlSelector.checkUrl(); - Assert.assertEquals(rmBasePaths[0], haurlSelector.getSelectedUrl()); + //Assert.assertEquals(rmBasePaths[0], haurlSelector.getSelectedUrl()); }
