AMBARI-20591 In case of HA-enabled cluster on shutting down Active Master, Ambari rest api call and HBase quick links show two Active Masters (dsen)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/c11d0045 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/c11d0045 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/c11d0045 Branch: refs/heads/branch-feature-AMBARI-12556 Commit: c11d0045214249e16ba1564c94e6fc6ec4e04d83 Parents: 8e15ba6 Author: Dmytro Sen <[email protected]> Authored: Thu Apr 6 15:13:52 2017 +0300 Committer: Dmytro Sen <[email protected]> Committed: Thu Apr 6 15:13:52 2017 +0300 ---------------------------------------------------------------------- .../state/services/MetricsRetrievalService.java | 29 ++++++++- .../services/MetricsRetrievalServiceTest.java | 63 ++++++++++++++++++++ 2 files changed, 91 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/c11d0045/ambari-server/src/main/java/org/apache/ambari/server/state/services/MetricsRetrievalService.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/services/MetricsRetrievalService.java b/ambari-server/src/main/java/org/apache/ambari/server/state/services/MetricsRetrievalService.java index 79e0e25..59ec15b 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/services/MetricsRetrievalService.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/services/MetricsRetrievalService.java @@ -18,6 +18,7 @@ package org.apache.ambari.server.state.services; import java.io.BufferedReader; +import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.lang.Thread.UncaughtExceptionHandler; @@ -441,7 +442,12 @@ public class MetricsRetrievalService extends AbstractService { if (null != m_ttlUrlCache) { m_ttlUrlCache.put(m_url, m_url); } - + } catch (IOException exception) + { + LOG.debug("Removing cached values for url {}", m_url); + // need to ensure old values are removed because they could be not valid if the state have changed. + removeCachedMetricsForCurrentURL(); + logException(exception, m_url); } catch (Exception exception) { logException(exception, m_url); } finally { @@ -454,6 +460,11 @@ public class MetricsRetrievalService extends AbstractService { } /** + * Removes metric values for current URL from cache. + */ + protected abstract void removeCachedMetricsForCurrentURL(); + + /** * Reads data from the specified {@link InputStream} and processes that into * a cachable value. The value will then be cached by this method. * @@ -536,6 +547,14 @@ public class MetricsRetrievalService extends AbstractService { * {@inheritDoc} */ @Override + protected void removeCachedMetricsForCurrentURL() { + m_cache.invalidate(m_url); + } + + /** + * {@inheritDoc} + */ + @Override protected void processInputStreamAndCacheResult(InputStream inputStream) throws Exception { JMXMetricHolder jmxMetricHolder = m_jmxObjectReader.readValue(inputStream); m_cache.put(m_url, jmxMetricHolder); @@ -575,6 +594,14 @@ public class MetricsRetrievalService extends AbstractService { * {@inheritDoc} */ @Override + protected void removeCachedMetricsForCurrentURL() { + m_cache.invalidate(m_url); + } + + /** + * {@inheritDoc} + */ + @Override protected void processInputStreamAndCacheResult(InputStream inputStream) throws Exception { Type type = new TypeToken<Map<Object, Object>>() {}.getType(); http://git-wip-us.apache.org/repos/asf/ambari/blob/c11d0045/ambari-server/src/test/java/org/apache/ambari/server/state/services/MetricsRetrievalServiceTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/services/MetricsRetrievalServiceTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/services/MetricsRetrievalServiceTest.java index 784ba92..ea204aa 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/state/services/MetricsRetrievalServiceTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/state/services/MetricsRetrievalServiceTest.java @@ -17,6 +17,7 @@ */ package org.apache.ambari.server.state.services; +import java.io.IOException; import java.io.InputStream; import java.util.Map; @@ -117,6 +118,68 @@ public class MetricsRetrievalServiceTest extends EasyMockSupport { } /** + * Test removing cached values if request failed with IOException. + */ + @Test + public void testRemovingValuesFromCacheOnFail() throws Exception { + + Configuration configuration = m_injector.getInstance(Configuration.class); + configuration.setProperty( + Configuration.METRIC_RETRIEVAL_SERVICE_REQUEST_TTL.getKey(), "1"); + + InputStream jmxInputStream = IOUtils.toInputStream("{ \"beans\": [] }"); + InputStream restInputStream = IOUtils.toInputStream("{}"); + + StreamProvider streamProvider = createNiceMock(StreamProvider.class); + + EasyMock.expect(streamProvider.readFrom(JMX_URL)).andReturn(jmxInputStream).once(); + EasyMock.expect(streamProvider.readFrom(REST_URL)).andReturn(restInputStream).once(); + + EasyMock.expect(streamProvider.readFrom(JMX_URL)).andThrow(new IOException()).once(); + EasyMock.expect(streamProvider.readFrom(REST_URL)).andThrow(new IOException()).once(); + + replayAll(); + + m_service.doStart(); + + // make the service synchronous + m_service.setThreadPoolExecutor(new SynchronousThreadPoolExecutor()); + + JMXMetricHolder jmxMetricHolder = m_service.getCachedJMXMetric(JMX_URL); + Assert.assertNull(jmxMetricHolder); + + Map<String, String> restMetrics = m_service.getCachedRESTMetric(REST_URL); + Assert.assertNull(restMetrics); + + m_service.submitRequest(MetricSourceType.JMX, streamProvider, JMX_URL); + jmxMetricHolder = m_service.getCachedJMXMetric(JMX_URL); + Assert.assertNotNull(jmxMetricHolder); + + m_service.submitRequest(MetricSourceType.REST, streamProvider, REST_URL); + restMetrics = m_service.getCachedRESTMetric(REST_URL); + Assert.assertNotNull(restMetrics); + + + jmxMetricHolder = m_service.getCachedJMXMetric(JMX_URL); + Assert.assertNotNull(jmxMetricHolder); + + restMetrics = m_service.getCachedRESTMetric(REST_URL); + Assert.assertNotNull(restMetrics); + + Thread.sleep(1000); + + m_service.submitRequest(MetricSourceType.JMX, streamProvider, JMX_URL); + jmxMetricHolder = m_service.getCachedJMXMetric(JMX_URL); + Assert.assertNull(jmxMetricHolder); + + m_service.submitRequest(MetricSourceType.REST, streamProvider, REST_URL); + restMetrics = m_service.getCachedRESTMetric(REST_URL); + Assert.assertNull(restMetrics); + + verifyAll(); + } + + /** * Tests that many requests to the same URL do not invoke the stream provider * more than once. */
