Repository: ambari Updated Branches: refs/heads/trunk a4af94292 -> 35b4ef6fe
AMBARI-16143. Ambari metrics API call should allow for early failure. (swagle) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/35b4ef6f Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/35b4ef6f Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/35b4ef6f Branch: refs/heads/trunk Commit: 35b4ef6fe18b6a88abdef9998e41010c9035f293 Parents: a4af942 Author: Siddharth Wagle <[email protected]> Authored: Thu Apr 28 16:20:35 2016 -0700 Committer: Siddharth Wagle <[email protected]> Committed: Thu Apr 28 16:20:41 2016 -0700 ---------------------------------------------------------------------- .../metrics/MetricsPropertyProvider.java | 2 +- .../metrics/timeline/AMSPropertyProvider.java | 44 ++++------- .../timeline/AMSReportPropertyProvider.java | 19 +++-- .../metrics/timeline/MetricsRequestHelper.java | 2 +- .../timeline/cache/TimelineMetricCache.java | 19 ++++- .../cache/TimelineMetricCacheEntryFactory.java | 2 + .../timeline/AMSPropertyProviderTest.java | 81 +++++++++++++++++++- 7 files changed, 130 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/35b4ef6f/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsPropertyProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsPropertyProvider.java index a346051..61d7a17 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsPropertyProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsPropertyProvider.java @@ -132,7 +132,7 @@ public abstract class MetricsPropertyProvider extends AbstractPropertyProvider { return resources; } - if(!checkAuthorizationForMetrics(resources, clusterNamePropertyId)) { + if (!checkAuthorizationForMetrics(resources, clusterNamePropertyId)) { return resources; } http://git-wip-us.apache.org/repos/asf/ambari/blob/35b4ef6f/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java index 4bc9fd7..9a7454c 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java @@ -196,7 +196,7 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider { * @throws SystemException if unable to populate the resources */ @SuppressWarnings("unchecked") - public Collection<Resource> populateResources() throws SystemException { + public Collection<Resource> populateResources() throws SystemException, IOException { // No open ended query support. if (temporalInfo != null && (temporalInfo.getStartTime() == null || temporalInfo.getEndTime() == null)) { @@ -218,20 +218,10 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider { if (!hostComponentHostMetrics.isEmpty()) { String hostComponentHostMetricParams = getSetString(processRegexps(hostComponentHostMetrics), -1); setQueryParams(hostComponentHostMetricParams, hostnames, true, componentName); - TimelineMetrics metricsResponse = null; - try { - metricsResponse = getTimelineMetricsFromCache( + TimelineMetrics metricsResponse = getTimelineMetricsFromCache( getTimelineAppMetricCacheKey(hostComponentHostMetrics, componentName, hostnames, uriBuilder.toString()), componentName); - } catch (IOException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Caught exception fetching metric data.", e); - } - // Skip further queries to preempt long calls due to timeout - if (e instanceof SocketTimeoutException) { - return Collections.emptySet(); - } - } + if (metricsResponse != null) { timelineMetrics.getMetrics().addAll(metricsResponse.getMetrics()); } @@ -240,20 +230,10 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider { if (!nonHostComponentMetrics.isEmpty()) { String nonHostComponentHostMetricParams = getSetString(processRegexps(nonHostComponentMetrics), -1); setQueryParams(nonHostComponentHostMetricParams, hostnames, false, componentName); - TimelineMetrics metricsResponse = null; - try { - metricsResponse = getTimelineMetricsFromCache( + TimelineMetrics metricsResponse = getTimelineMetricsFromCache( getTimelineAppMetricCacheKey(nonHostComponentMetrics, componentName, hostnames, uriBuilder.toString()), componentName); - } catch (IOException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Caught exception fetching metric data.", e); - } - // Skip further queries to preempt long calls due to timeout - if (e instanceof SocketTimeoutException) { - return Collections.emptySet(); - } - } + if (metricsResponse != null) { timelineMetrics.getMetrics().addAll(metricsResponse.getMetrics()); } @@ -485,8 +465,18 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider { // For each cluster for (Map.Entry<String, Map<TemporalInfo, MetricsRequest>> clusterEntry : requestMap.entrySet()) { // For each request - for (MetricsRequest metricsRequest : clusterEntry.getValue().values() ) { - metricsRequest.populateResources(); + for (MetricsRequest metricsRequest : clusterEntry.getValue().values()) { + try { + metricsRequest.populateResources(); + } catch (IOException io) { + // Skip further queries to preempt long calls due to timeout + if (io instanceof SocketTimeoutException) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skip populating resources on socket timeout."); + } + break; + } + } } } http://git-wip-us.apache.org/repos/asf/ambari/blob/35b4ef6f/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java index 306390c..3688742 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java @@ -38,6 +38,7 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.apache.http.client.utils.URIBuilder; import java.io.IOException; +import java.net.SocketTimeoutException; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -204,15 +205,21 @@ public class AMSReportPropertyProvider extends MetricsReportPropertyProvider { // Self populating cache updates itself on every get with latest results TimelineMetrics timelineMetrics; - if (metricCache != null && metricCacheKey.getTemporalInfo() != null) { - timelineMetrics = metricCache.getAppTimelineMetricsFromCache(metricCacheKey); - } else { - try { + try { + if (metricCache != null && metricCacheKey.getTemporalInfo() != null) { + timelineMetrics = metricCache.getAppTimelineMetricsFromCache(metricCacheKey); + } else { timelineMetrics = requestHelper.fetchTimelineMetrics(uriBuilder, temporalInfo.getStartTimeMillis(), temporalInfo.getEndTimeMillis()); - } catch (IOException e) { - timelineMetrics = null; + } + } catch (IOException io) { + timelineMetrics = null; + if (io instanceof SocketTimeoutException) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skip populating metrics on socket timeout exception."); + } + break; } } http://git-wip-us.apache.org/repos/asf/ambari/blob/35b4ef6f/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java index 94014f8..388de15 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java @@ -117,7 +117,7 @@ public class MetricsRequestHelper { } if (io instanceof SocketTimeoutException) { - errorMsg += " Can not connect to collector, socket error."; + errorMsg += " Cannot connect to collector: SocketTimeoutException."; LOG.error(errorMsg); throw io; } http://git-wip-us.apache.org/repos/asf/ambari/blob/35b4ef6f/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java index 8b17a23..b5fe05e 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java @@ -30,6 +30,8 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.concurrent.atomic.AtomicInteger; @@ -54,7 +56,7 @@ public class TimelineMetricCache extends UpdatingSelfPopulatingCache { * @param key @TimelineAppMetricCacheKey * @return @org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics */ - public TimelineMetrics getAppTimelineMetricsFromCache(TimelineAppMetricCacheKey key) throws IllegalArgumentException { + public TimelineMetrics getAppTimelineMetricsFromCache(TimelineAppMetricCacheKey key) throws IllegalArgumentException, IOException { if (LOG.isDebugEnabled()) { LOG.debug("Fetching metrics with key: " + key); } @@ -62,7 +64,20 @@ public class TimelineMetricCache extends UpdatingSelfPopulatingCache { // Make sure key is valid validateKey(key); - Element element = get(key); + Element element = null; + try { + element = get(key); + } catch (LockTimeoutException le) { + // Ehcache masks the Socket Timeout to look as a LockTimeout + Throwable t = le.getCause(); + if (t instanceof CacheException) { + t = t.getCause(); + if (t instanceof SocketTimeoutException) { + throw new SocketTimeoutException(t.getMessage()); + } + } + } + TimelineMetrics timelineMetrics = new TimelineMetrics(); if (element != null && element.getObjectValue() != null) { TimelineMetricsCacheValue value = (TimelineMetricsCacheValue) element.getObjectValue(); http://git-wip-us.apache.org/repos/asf/ambari/blob/35b4ef6f/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java index ed0f878..4880d98 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java @@ -91,6 +91,7 @@ public class TimelineMetricCacheEntryFactory implements UpdatingCacheEntryFactor metricCacheKey.getTemporalInfo().getEndTimeMillis()); } catch (IOException io) { LOG.debug("Caught IOException on fetching metrics. " + io.getMessage()); + throw io; } TimelineMetricsCacheValue value = null; @@ -191,6 +192,7 @@ public class TimelineMetricCacheEntryFactory implements UpdatingCacheEntryFactor if (LOG.isDebugEnabled()) { LOG.debug("Exception retrieving metrics.", io); } + throw io; } } else { LOG.debug("Skip updating cache with new startTime = " + http://git-wip-us.apache.org/repos/asf/ambari/blob/35b4ef6f/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java index 3adf9f7..c7dabf1 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java @@ -23,6 +23,7 @@ import org.apache.ambari.server.configuration.ComponentSSLConfiguration; import org.apache.ambari.server.configuration.Configuration; import org.apache.ambari.server.controller.AmbariManagementController; import org.apache.ambari.server.controller.AmbariServer; +import org.apache.ambari.server.controller.internal.AbstractPropertyProvider; import org.apache.ambari.server.controller.internal.PropertyInfo; import org.apache.ambari.server.controller.internal.ResourceImpl; import org.apache.ambari.server.controller.internal.TemporalInfoImpl; @@ -43,6 +44,7 @@ import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.ComponentInfo; import org.apache.ambari.server.state.StackId; +import org.apache.commons.httpclient.HttpConnection; import org.apache.http.client.utils.URIBuilder; import org.easymock.EasyMock; import org.junit.After; @@ -50,16 +52,21 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.Matchers; import org.powermock.api.easymock.PowerMock; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import org.springframework.security.core.context.SecurityContextHolder; +import javax.ws.rs.HttpMethod; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.net.HttpURLConnection; +import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -70,9 +77,14 @@ import java.util.Set; import static org.apache.ambari.server.controller.metrics.MetricsServiceProvider.MetricsService; import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.createMockBuilder; import static org.easymock.EasyMock.createNiceMock; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; +import static org.mockito.Matchers.anyCollection; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @RunWith(PowerMockRunner.class) @@ -934,6 +946,72 @@ public class AMSPropertyProviderTest { } } + @Test + public void testSocketTimeoutExceptionBehavior() throws Exception { + setUpCommonMocks(); + + SecurityContextHolder.getContext().setAuthentication( + TestAuthenticationFactory.createClusterAdministrator("ClusterAdmin", 2L)); + + URLStreamProvider streamProvider = createNiceMock(URLStreamProvider.class); + HttpURLConnection connection = createNiceMock(HttpURLConnection.class); + + expect(streamProvider.processURL((String) anyObject(), (String) anyObject(), + (String) anyObject(), (Map<String, List<String>>) anyObject())).andReturn(connection); + + expect(connection.getInputStream()).andThrow( + new SocketTimeoutException("Unit test raising Exception")).once(); + + replay(streamProvider, connection); + + injectCacheEntryFactoryWithStreamProvider(streamProvider); + TestMetricHostProvider metricHostProvider = new TestMetricHostProvider(); + ComponentSSLConfiguration sslConfiguration = mock(ComponentSSLConfiguration.class); + + Map<String, Map<String, PropertyInfo>> propertyIds = PropertyHelper.getMetricPropertyIds(Resource.Type.Host); + + AMSPropertyProvider propertyProvider = new AMSHostPropertyProvider( + propertyIds, + streamProvider, + sslConfiguration, + new TimelineMetricCacheProvider(new Configuration(), cacheEntryFactory), + metricHostProvider, + CLUSTER_NAME_PROPERTY_ID, + HOST_NAME_PROPERTY_ID + ); + + final Resource resource1 = new ResourceImpl(Resource.Type.Host); + resource1.setProperty(CLUSTER_NAME_PROPERTY_ID, "c1"); + resource1.setProperty(HOST_NAME_PROPERTY_ID, "h1"); + final Resource resource2 = new ResourceImpl(Resource.Type.Host); + resource2.setProperty(CLUSTER_NAME_PROPERTY_ID, "c1"); + resource2.setProperty(HOST_NAME_PROPERTY_ID, "h2"); + + // Separating temporal info to ensure multiple requests made + Map<String, TemporalInfo> temporalInfoMap = new HashMap<String, TemporalInfo>(); + temporalInfoMap.put(PROPERTY_ID1, new TemporalInfoImpl(1416445244801L, 1416448936464L, 1L)); + temporalInfoMap.put(PROPERTY_ID2, new TemporalInfoImpl(1416445344901L, 1416448946564L, 1L)); + + Request request = PropertyHelper.getReadRequest( + new HashSet<String>() {{ + add(PROPERTY_ID1); + add(PROPERTY_ID2); + }}, temporalInfoMap); + + Set<Resource> resources = + propertyProvider.populateResources( + new HashSet<Resource>() {{ add(resource1); add(resource2); }}, request, null); + + verify(streamProvider, connection); + + Assert.assertEquals(2, resources.size()); + Resource res = resources.iterator().next(); + Map<String, Object> properties = PropertyHelper.getProperties(resources.iterator().next()); + Assert.assertNotNull(properties); + Assert.assertNull(res.getPropertyValue(PROPERTY_ID1)); + Assert.assertNull(res.getPropertyValue(PROPERTY_ID2)); + } + public static class TestMetricHostProvider implements MetricHostProvider { private String hostName; @@ -992,7 +1070,7 @@ public class AMSPropertyProviderTest { expect(ams.getAmbariMetaInfo()).andReturn(ambariMetaInfo).anyTimes(); expect(ambariMetaInfo.getComponentToService(anyObject(String.class), anyObject(String.class), anyObject(String.class))).andReturn("HDFS").anyTimes(); - expect(ambariMetaInfo.getComponent(anyObject(String.class),anyObject(String.class), + expect(ambariMetaInfo.getComponent(anyObject(String.class), anyObject(String.class), anyObject(String.class), anyObject(String.class))) .andReturn(componentInfo).anyTimes(); expect(componentInfo.getTimelineAppid()).andReturn(null).anyTimes(); @@ -1007,5 +1085,4 @@ public class AMSPropertyProviderTest { field.setAccessible(true); field.set(cacheEntryFactory, new MetricsRequestHelper(streamProvider)); } - }
