Repository: ambari Updated Branches: refs/heads/branch-2.2 9e1739342 -> 5078781b6
AMBARI-16143. Ambari metrics API call should allow for early failure. (swagle) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/5078781b Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/5078781b Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/5078781b Branch: refs/heads/branch-2.2 Commit: 5078781b6a802665159d3a59b76714a39850738a Parents: 9e17393 Author: Siddharth Wagle <[email protected]> Authored: Thu Apr 28 16:27:32 2016 -0700 Committer: Siddharth Wagle <[email protected]> Committed: Thu Apr 28 16:27:32 2016 -0700 ---------------------------------------------------------------------- .../metrics/timeline/AMSPropertyProvider.java | 44 +++++-------- .../timeline/AMSReportPropertyProvider.java | 19 ++++-- .../metrics/timeline/MetricsRequestHelper.java | 2 +- .../timeline/cache/TimelineMetricCache.java | 19 +++++- .../cache/TimelineMetricCacheEntryFactory.java | 2 + .../timeline/AMSPropertyProviderTest.java | 69 +++++++++++++++++++- 6 files changed, 116 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/5078781b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java index 9e5c1b6..463a9fb 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java @@ -196,7 +196,7 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider { * @throws SystemException if unable to populate the resources */ @SuppressWarnings("unchecked") - public Collection<Resource> populateResources() throws SystemException { + public Collection<Resource> populateResources() throws SystemException, IOException { // No open ended query support. if (temporalInfo != null && (temporalInfo.getStartTime() == null || temporalInfo.getEndTime() == null)) { @@ -218,20 +218,10 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider { if (!hostComponentHostMetrics.isEmpty()) { String hostComponentHostMetricParams = getSetString(processRegexps(hostComponentHostMetrics), -1); setQueryParams(hostComponentHostMetricParams, hostnames, true, componentName); - TimelineMetrics metricsResponse = null; - try { - metricsResponse = getTimelineMetricsFromCache( + TimelineMetrics metricsResponse = getTimelineMetricsFromCache( getTimelineAppMetricCacheKey(hostComponentHostMetrics, componentName, hostnames, uriBuilder.toString()), componentName); - } catch (IOException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Caught exception fetching metric data.", e); - } - // Skip further queries to preempt long calls due to timeout - if (e instanceof SocketTimeoutException) { - return Collections.emptySet(); - } - } + if (metricsResponse != null) { timelineMetrics.getMetrics().addAll(metricsResponse.getMetrics()); } @@ -240,20 +230,10 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider { if (!nonHostComponentMetrics.isEmpty()) { String nonHostComponentHostMetricParams = getSetString(processRegexps(nonHostComponentMetrics), -1); setQueryParams(nonHostComponentHostMetricParams, hostnames, false, componentName); - TimelineMetrics metricsResponse = null; - try { - metricsResponse = getTimelineMetricsFromCache( + TimelineMetrics metricsResponse = getTimelineMetricsFromCache( getTimelineAppMetricCacheKey(nonHostComponentMetrics, componentName, hostnames, uriBuilder.toString()), componentName); - } catch (IOException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Caught exception fetching metric data.", e); - } - // Skip further queries to preempt long calls due to timeout - if (e instanceof SocketTimeoutException) { - return Collections.emptySet(); - } - } + if (metricsResponse != null) { timelineMetrics.getMetrics().addAll(metricsResponse.getMetrics()); } @@ -485,8 +465,18 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider { // For each cluster for (Map.Entry<String, Map<TemporalInfo, MetricsRequest>> clusterEntry : requestMap.entrySet()) { // For each request - for (MetricsRequest metricsRequest : clusterEntry.getValue().values() ) { - metricsRequest.populateResources(); + for (MetricsRequest metricsRequest : clusterEntry.getValue().values()) { + try { + metricsRequest.populateResources(); + } catch (IOException io) { + // Skip further queries to preempt long calls due to timeout + if (io instanceof SocketTimeoutException) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skip populating resources on socket timeout."); + } + break; + } + } } } http://git-wip-us.apache.org/repos/asf/ambari/blob/5078781b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java index 306390c..3688742 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java @@ -38,6 +38,7 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.apache.http.client.utils.URIBuilder; import java.io.IOException; +import java.net.SocketTimeoutException; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -204,15 +205,21 @@ public class AMSReportPropertyProvider extends MetricsReportPropertyProvider { // Self populating cache updates itself on every get with latest results TimelineMetrics timelineMetrics; - if (metricCache != null && metricCacheKey.getTemporalInfo() != null) { - timelineMetrics = metricCache.getAppTimelineMetricsFromCache(metricCacheKey); - } else { - try { + try { + if (metricCache != null && metricCacheKey.getTemporalInfo() != null) { + timelineMetrics = metricCache.getAppTimelineMetricsFromCache(metricCacheKey); + } else { timelineMetrics = requestHelper.fetchTimelineMetrics(uriBuilder, temporalInfo.getStartTimeMillis(), temporalInfo.getEndTimeMillis()); - } catch (IOException e) { - timelineMetrics = null; + } + } catch (IOException io) { + timelineMetrics = null; + if (io instanceof SocketTimeoutException) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skip populating metrics on socket timeout exception."); + } + break; } } http://git-wip-us.apache.org/repos/asf/ambari/blob/5078781b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java index 94014f8..388de15 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java @@ -117,7 +117,7 @@ public class MetricsRequestHelper { } if (io instanceof SocketTimeoutException) { - errorMsg += " Can not connect to collector, socket error."; + errorMsg += " Cannot connect to collector: SocketTimeoutException."; LOG.error(errorMsg); throw io; } http://git-wip-us.apache.org/repos/asf/ambari/blob/5078781b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java index 8b17a23..b5fe05e 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java @@ -30,6 +30,8 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.concurrent.atomic.AtomicInteger; @@ -54,7 +56,7 @@ public class TimelineMetricCache extends UpdatingSelfPopulatingCache { * @param key @TimelineAppMetricCacheKey * @return @org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics */ - public TimelineMetrics getAppTimelineMetricsFromCache(TimelineAppMetricCacheKey key) throws IllegalArgumentException { + public TimelineMetrics getAppTimelineMetricsFromCache(TimelineAppMetricCacheKey key) throws IllegalArgumentException, IOException { if (LOG.isDebugEnabled()) { LOG.debug("Fetching metrics with key: " + key); } @@ -62,7 +64,20 @@ public class TimelineMetricCache extends UpdatingSelfPopulatingCache { // Make sure key is valid validateKey(key); - Element element = get(key); + Element element = null; + try { + element = get(key); + } catch (LockTimeoutException le) { + // Ehcache masks the Socket Timeout to look as a LockTimeout + Throwable t = le.getCause(); + if (t instanceof CacheException) { + t = t.getCause(); + if (t instanceof SocketTimeoutException) { + throw new SocketTimeoutException(t.getMessage()); + } + } + } + TimelineMetrics timelineMetrics = new TimelineMetrics(); if (element != null && element.getObjectValue() != null) { TimelineMetricsCacheValue value = (TimelineMetricsCacheValue) element.getObjectValue(); http://git-wip-us.apache.org/repos/asf/ambari/blob/5078781b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java index ed0f878..4880d98 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java @@ -91,6 +91,7 @@ public class TimelineMetricCacheEntryFactory implements UpdatingCacheEntryFactor metricCacheKey.getTemporalInfo().getEndTimeMillis()); } catch (IOException io) { LOG.debug("Caught IOException on fetching metrics. " + io.getMessage()); + throw io; } TimelineMetricsCacheValue value = null; @@ -191,6 +192,7 @@ public class TimelineMetricCacheEntryFactory implements UpdatingCacheEntryFactor if (LOG.isDebugEnabled()) { LOG.debug("Exception retrieving metrics.", io); } + throw io; } } else { LOG.debug("Skip updating cache with new startTime = " + http://git-wip-us.apache.org/repos/asf/ambari/blob/5078781b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java index b13465a..3a18e52 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java @@ -36,7 +36,6 @@ import org.apache.ambari.server.controller.spi.Resource; import org.apache.ambari.server.controller.spi.SystemException; import org.apache.ambari.server.controller.spi.TemporalInfo; import org.apache.ambari.server.controller.utilities.PropertyHelper; -import org.apache.ambari.server.controller.utilities.StreamProvider; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.ComponentInfo; @@ -55,7 +54,8 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.lang.reflect.Field; -import java.lang.reflect.Modifier; +import java.net.HttpURLConnection; +import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -69,6 +69,7 @@ import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.createNiceMock; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; import static org.mockito.Mockito.mock; @RunWith(PowerMockRunner.class) @@ -817,6 +818,69 @@ public class AMSPropertyProviderTest { } } + @Test + public void testSocketTimeoutExceptionBehavior() throws Exception { + setUpCommonMocks(); + + URLStreamProvider streamProvider = createNiceMock(URLStreamProvider.class); + HttpURLConnection connection = createNiceMock(HttpURLConnection.class); + + expect(streamProvider.processURL((String) anyObject(), (String) anyObject(), + (String) anyObject(), (Map<String, List<String>>) anyObject())).andReturn(connection); + + expect(connection.getInputStream()).andThrow( + new SocketTimeoutException("Unit test raising Exception")).once(); + + replay(streamProvider, connection); + + injectCacheEntryFactoryWithStreamProvider(streamProvider); + TestMetricHostProvider metricHostProvider = new TestMetricHostProvider(); + ComponentSSLConfiguration sslConfiguration = mock(ComponentSSLConfiguration.class); + + Map<String, Map<String, PropertyInfo>> propertyIds = PropertyHelper.getMetricPropertyIds(Resource.Type.Host); + + AMSPropertyProvider propertyProvider = new AMSHostPropertyProvider( + propertyIds, + streamProvider, + sslConfiguration, + cacheProvider, + metricHostProvider, + CLUSTER_NAME_PROPERTY_ID, + HOST_NAME_PROPERTY_ID + ); + + final Resource resource1 = new ResourceImpl(Resource.Type.Host); + resource1.setProperty(CLUSTER_NAME_PROPERTY_ID, "c1"); + resource1.setProperty(HOST_NAME_PROPERTY_ID, "h1"); + final Resource resource2 = new ResourceImpl(Resource.Type.Host); + resource2.setProperty(CLUSTER_NAME_PROPERTY_ID, "c1"); + resource2.setProperty(HOST_NAME_PROPERTY_ID, "h2"); + + // Separating temporal info to ensure multiple requests made + Map<String, TemporalInfo> temporalInfoMap = new HashMap<String, TemporalInfo>(); + temporalInfoMap.put(PROPERTY_ID1, new TemporalInfoImpl(1416445244801L, 1416448936464L, 1L)); + temporalInfoMap.put(PROPERTY_ID2, new TemporalInfoImpl(1416445344901L, 1416448946564L, 1L)); + + Request request = PropertyHelper.getReadRequest( + new HashSet<String>() {{ + add(PROPERTY_ID1); + add(PROPERTY_ID2); + }}, temporalInfoMap); + + Set<Resource> resources = + propertyProvider.populateResources( + new HashSet<Resource>() {{ add(resource1); add(resource2); }}, request, null); + + verify(streamProvider, connection); + + Assert.assertEquals(2, resources.size()); + Resource res = resources.iterator().next(); + Map<String, Object> properties = PropertyHelper.getProperties(resources.iterator().next()); + Assert.assertNotNull(properties); + Assert.assertNull(res.getPropertyValue(PROPERTY_ID1)); + Assert.assertNull(res.getPropertyValue(PROPERTY_ID2)); + } + public static class TestMetricHostProvider implements MetricHostProvider { private String hostName; @@ -889,5 +953,4 @@ public class AMSPropertyProviderTest { field.setAccessible(true); field.set(cacheEntryFactory, new MetricsRequestHelper(streamProvider)); } - }
