Repository: knox Updated Branches: refs/heads/master b618ff3e3 -> 0c1ff50fe
KNOX-580 Initial refactoring out of default HA dispatch Project: http://git-wip-us.apache.org/repos/asf/knox/repo Commit: http://git-wip-us.apache.org/repos/asf/knox/commit/0c1ff50f Tree: http://git-wip-us.apache.org/repos/asf/knox/tree/0c1ff50f Diff: http://git-wip-us.apache.org/repos/asf/knox/diff/0c1ff50f Branch: refs/heads/master Commit: 0c1ff50fe9e1d82ab12be8d8246a416f02106e8e Parents: b618ff3 Author: Sumit Gupta <[email protected]> Authored: Wed Aug 5 11:00:29 2015 -0400 Committer: Sumit Gupta <[email protected]> Committed: Wed Aug 5 11:00:29 2015 -0400 ---------------------------------------------------------------------- .../gateway/ha/dispatch/DefaultHaDispatch.java | 137 +++++++++++++++++++ .../ha/dispatch/i18n/HaDispatchMessages.java | 41 ++++++ .../ha/dispatch/DefaultHaDispatchTest.java | 106 ++++++++++++++ .../ServiceDefinitionDeploymentContributor.java | 25 +++- .../gateway/hdfs/i18n/WebHdfsMessages.java | 28 +--- 5 files changed, 312 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/knox/blob/0c1ff50f/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/dispatch/DefaultHaDispatch.java ---------------------------------------------------------------------- diff --git a/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/dispatch/DefaultHaDispatch.java b/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/dispatch/DefaultHaDispatch.java new file mode 100644 index 0000000..03aa369 --- /dev/null +++ b/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/dispatch/DefaultHaDispatch.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.gateway.ha.dispatch; + +import org.apache.hadoop.gateway.config.Configure; +import org.apache.hadoop.gateway.dispatch.DefaultDispatch; +import org.apache.hadoop.gateway.filter.AbstractGatewayFilter; +import org.apache.hadoop.gateway.ha.dispatch.i18n.HaDispatchMessages; +import org.apache.hadoop.gateway.ha.provider.HaProvider; +import org.apache.hadoop.gateway.ha.provider.HaServiceConfig; +import org.apache.hadoop.gateway.ha.provider.impl.HaServiceConfigConstants; +import org.apache.hadoop.gateway.i18n.messages.MessagesFactory; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpRequestBase; +import org.apache.http.client.methods.HttpUriRequest; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.net.URI; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Default HA dispatch class that has a very basic failover mechanism + */ +public class DefaultHaDispatch extends DefaultDispatch { + + protected static final String FAILOVER_COUNTER_ATTRIBUTE = "dispatch.ha.failover.counter"; + + private static final HaDispatchMessages LOG = MessagesFactory.get(HaDispatchMessages.class); + + private int maxFailoverAttempts = HaServiceConfigConstants.DEFAULT_MAX_FAILOVER_ATTEMPTS; + + private int failoverSleep = HaServiceConfigConstants.DEFAULT_FAILOVER_SLEEP; + + private HaProvider haProvider; + + private String serviceRole; + + @Override + public void init() { + super.init(); + LOG.initializingForResourceRole(getServiceRole()); + if ( haProvider != null ) { + HaServiceConfig serviceConfig = haProvider.getHaDescriptor().getServiceConfig(getServiceRole()); + maxFailoverAttempts = serviceConfig.getMaxFailoverAttempts(); + failoverSleep = serviceConfig.getFailoverSleep(); + } + } + + public String getServiceRole() { + return serviceRole; + } + + @Configure + public void setServiceRole(String serviceRole) { + this.serviceRole = serviceRole; + } + + public HaProvider getHaProvider() { + return haProvider; + } + + @Configure + public void setHaProvider(HaProvider haProvider) { + this.haProvider = haProvider; + } + + @Override + protected void executeRequest(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) throws IOException { + HttpResponse inboundResponse = null; + try { + inboundResponse = executeOutboundRequest(outboundRequest); + writeOutboundResponse(outboundRequest, inboundRequest, outboundResponse, inboundResponse); + } catch ( IOException e ) { + LOG.errorConnectingToServer(outboundRequest.getURI().toString(), e); + failoverRequest(outboundRequest, inboundRequest, outboundResponse, inboundResponse, e); + } + } + + + private void failoverRequest(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse, HttpResponse inboundResponse, Exception exception) throws IOException { + LOG.failingOverRequest(outboundRequest.getURI().toString()); + AtomicInteger counter = (AtomicInteger) inboundRequest.getAttribute(FAILOVER_COUNTER_ATTRIBUTE); + if ( counter == null ) { + counter = new AtomicInteger(0); + } + inboundRequest.setAttribute(FAILOVER_COUNTER_ATTRIBUTE, counter); + if ( counter.incrementAndGet() <= maxFailoverAttempts ) { + haProvider.markFailedURL(getServiceRole(), outboundRequest.getURI().toString()); + //null out target url so that rewriters run again + inboundRequest.setAttribute(AbstractGatewayFilter.TARGET_REQUEST_URL_ATTRIBUTE_NAME, null); + URI uri = getDispatchUrl(inboundRequest); + ((HttpRequestBase) outboundRequest).setURI(uri); + if ( failoverSleep > 0 ) { + try { + Thread.sleep(failoverSleep); + } catch ( InterruptedException e ) { + LOG.failoverSleepFailed(getServiceRole(), e); + } + } + executeRequest(outboundRequest, inboundRequest, outboundResponse); + } else { + LOG.maxFailoverAttemptsReached(maxFailoverAttempts, getServiceRole()); + if ( inboundResponse != null ) { + writeOutboundResponse(outboundRequest, inboundRequest, outboundResponse, inboundResponse); + } else { + throw new IOException(exception); + } + } + } + + private static URI getDispatchUrl(HttpServletRequest request) { + StringBuffer str = request.getRequestURL(); + String query = request.getQueryString(); + if ( query != null ) { + str.append('?'); + str.append(query); + } + return URI.create(str.toString()); + } +} http://git-wip-us.apache.org/repos/asf/knox/blob/0c1ff50f/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/dispatch/i18n/HaDispatchMessages.java ---------------------------------------------------------------------- diff --git a/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/dispatch/i18n/HaDispatchMessages.java b/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/dispatch/i18n/HaDispatchMessages.java new file mode 100644 index 0000000..928f8b0 --- /dev/null +++ b/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/dispatch/i18n/HaDispatchMessages.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.gateway.ha.dispatch.i18n; + +import org.apache.hadoop.gateway.i18n.messages.Message; +import org.apache.hadoop.gateway.i18n.messages.MessageLevel; +import org.apache.hadoop.gateway.i18n.messages.Messages; +import org.apache.hadoop.gateway.i18n.messages.StackTrace; + +@Messages(logger = "org.apache.hadoop.gateway") +public interface HaDispatchMessages { + @Message(level = MessageLevel.INFO, text = "Initializing Ha Dispatch for: {0}") + void initializingForResourceRole(String resourceRole); + + @Message(level = MessageLevel.INFO, text = "Could not connect to server: {0} {1}") + void errorConnectingToServer(String uri, @StackTrace(level = MessageLevel.DEBUG) Exception e); + + @Message(level = MessageLevel.INFO, text = "Failing over request to a different server: {0}") + void failingOverRequest(String uri); + + @Message(level = MessageLevel.INFO, text = "Maximum attempts {0} to failover reached for service: {1}") + void maxFailoverAttemptsReached(int attempts, String service); + + @Message(level = MessageLevel.INFO, text = "Error occurred while trying to sleep for failover : {0} {1}") + void failoverSleepFailed(String service, @StackTrace(level = MessageLevel.DEBUG) Exception e); +} http://git-wip-us.apache.org/repos/asf/knox/blob/0c1ff50f/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/dispatch/DefaultHaDispatchTest.java ---------------------------------------------------------------------- diff --git a/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/dispatch/DefaultHaDispatchTest.java b/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/dispatch/DefaultHaDispatchTest.java new file mode 100644 index 0000000..5fd8a5f --- /dev/null +++ b/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/dispatch/DefaultHaDispatchTest.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.gateway.ha.dispatch; + +import org.apache.hadoop.gateway.ha.provider.HaDescriptor; +import org.apache.hadoop.gateway.ha.provider.HaProvider; +import org.apache.hadoop.gateway.ha.provider.HaServletContextListener; +import org.apache.hadoop.gateway.ha.provider.impl.DefaultHaProvider; +import org.apache.hadoop.gateway.ha.provider.impl.HaDescriptorFactory; +import org.apache.http.client.methods.HttpRequestBase; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.impl.client.DefaultHttpClient; +import org.apache.http.params.BasicHttpParams; +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.junit.Assert; +import org.junit.Test; + +import javax.servlet.FilterConfig; +import javax.servlet.ServletContext; +import javax.servlet.ServletOutputStream; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicInteger; + +public class DefaultHaDispatchTest { + + @Test + public void testConnectivityFailover() throws Exception { + String serviceName = "OOZIE"; + HaDescriptor descriptor = HaDescriptorFactory.createDescriptor(); + descriptor.addServiceConfig(HaDescriptorFactory.createServiceConfig(serviceName, "true", "1", "1000", "2", "1000")); + HaProvider provider = new DefaultHaProvider(descriptor); + URI uri1 = new URI( "http://unreachable-host" ); + URI uri2 = new URI( "http://reachable-host" ); + ArrayList<String> urlList = new ArrayList<String>(); + urlList.add(uri1.toString()); + urlList.add(uri2.toString()); + provider.addHaService(serviceName, urlList); + FilterConfig filterConfig = EasyMock.createNiceMock(FilterConfig.class); + ServletContext servletContext = EasyMock.createNiceMock(ServletContext.class); + + EasyMock.expect(filterConfig.getServletContext()).andReturn(servletContext).anyTimes(); + EasyMock.expect(servletContext.getAttribute(HaServletContextListener.PROVIDER_ATTRIBUTE_NAME)).andReturn(provider).anyTimes(); + + BasicHttpParams params = new BasicHttpParams(); + + HttpUriRequest outboundRequest = EasyMock.createNiceMock(HttpRequestBase.class); + EasyMock.expect(outboundRequest.getMethod()).andReturn( "GET" ).anyTimes(); + EasyMock.expect(outboundRequest.getURI()).andReturn( uri1 ).anyTimes(); + EasyMock.expect(outboundRequest.getParams()).andReturn( params ).anyTimes(); + + HttpServletRequest inboundRequest = EasyMock.createNiceMock(HttpServletRequest.class); + EasyMock.expect(inboundRequest.getRequestURL()).andReturn( new StringBuffer(uri2.toString()) ).once(); + EasyMock.expect(inboundRequest.getAttribute("dispatch.ha.failover.counter")).andReturn(new AtomicInteger(0)).once(); + EasyMock.expect(inboundRequest.getAttribute("dispatch.ha.failover.counter")).andReturn(new AtomicInteger(1)).once(); + + HttpServletResponse outboundResponse = EasyMock.createNiceMock(HttpServletResponse.class); + EasyMock.expect(outboundResponse.getOutputStream()).andAnswer( new IAnswer<ServletOutputStream>() { + @Override + public ServletOutputStream answer() throws Throwable { + return new ServletOutputStream() { + @Override + public void write( int b ) throws IOException { + throw new IOException( "unreachable-host" ); + } + }; + } + }).once(); + EasyMock.replay(filterConfig, servletContext, outboundRequest, inboundRequest, outboundResponse); + Assert.assertEquals(uri1.toString(), provider.getActiveURL(serviceName)); + DefaultHaDispatch dispatch = new DefaultHaDispatch(); + dispatch.setHttpClient(new DefaultHttpClient()); + dispatch.setHaProvider(provider); + dispatch.setServiceRole(serviceName); + dispatch.init(); + long startTime = System.currentTimeMillis(); + try { + dispatch.executeRequest(outboundRequest, inboundRequest, outboundResponse); + } catch (IOException e) { + //this is expected after the failover limit is reached + } + long elapsedTime = System.currentTimeMillis() - startTime; + Assert.assertEquals(uri2.toString(), provider.getActiveURL(serviceName)); + //test to make sure the sleep took place + Assert.assertTrue(elapsedTime > 1000); + } +} http://git-wip-us.apache.org/repos/asf/knox/blob/0c1ff50f/gateway-server/src/main/java/org/apache/hadoop/gateway/deploy/impl/ServiceDefinitionDeploymentContributor.java ---------------------------------------------------------------------- diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/deploy/impl/ServiceDefinitionDeploymentContributor.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/deploy/impl/ServiceDefinitionDeploymentContributor.java index 55d646f..8adbb43 100644 --- a/gateway-server/src/main/java/org/apache/hadoop/gateway/deploy/impl/ServiceDefinitionDeploymentContributor.java +++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/deploy/impl/ServiceDefinitionDeploymentContributor.java @@ -46,6 +46,8 @@ public class ServiceDefinitionDeploymentContributor extends ServiceDeploymentCon private static final String DISPATCH_IMPL_PARAM = "dispatch-impl"; + private static final String SERVICE_ROLE_PARAM = "serviceRole"; + private static final String REPLAY_BUFFER_SIZE_PARAM = "replayBufferSize"; private static final String DEFAULT_REPLAY_BUFFER_SIZE = "8"; @@ -54,6 +56,8 @@ public class ServiceDefinitionDeploymentContributor extends ServiceDeploymentCon private static final String XFORWARDED_FILTER_ROLE = "xforwardedheaders"; + private static final String DEFAULT_HA_DISPATCH_CLASS = "org.apache.hadoop.gateway.ha.dispatch.DefaultHaDispatch"; + private ServiceDefinition serviceDefinition; private UrlRewriteRulesDescriptor serviceRules; @@ -170,15 +174,17 @@ public class ServiceDefinitionDeploymentContributor extends ServiceDeploymentCon if ( customDispatch == null ) { customDispatch = serviceDefinition.getDispatch(); } + boolean isHaEnabled = isHaEnabled(context); if ( customDispatch != null ) { - boolean isHaEnabled = isHaEnabled(context); String haContributorName = customDispatch.getHaContributorName(); String haClassName = customDispatch.getHaClassName(); - if ( isHaEnabled && (haContributorName != null || haClassName != null)) { + if ( isHaEnabled) { if (haContributorName != null) { addDispatchFilter(context, service, resource, DISPATCH_ROLE, haContributorName); - } else { + } else if (haClassName != null) { addDispatchFilterForClass(context, service, resource, haClassName); + } else { + addDefaultHaDispatchFilter(context, service, resource); } } else { String contributorName = customDispatch.getContributorName(); @@ -188,15 +194,25 @@ public class ServiceDefinitionDeploymentContributor extends ServiceDeploymentCon String className = customDispatch.getClassName(); if ( className != null ) { addDispatchFilterForClass(context, service, resource, className); + } else { + //final fallback to the default dispatch + addDispatchFilter(context, service, resource, DISPATCH_ROLE, "http-client"); } } } + } else if (isHaEnabled) { + addDefaultHaDispatchFilter(context, service, resource); } else { addDispatchFilter(context, service, resource, DISPATCH_ROLE, "http-client"); } } - private void addDispatchFilterForClass(DeploymentContext context, Service service, ResourceDescriptor resource, String className) { + private void addDefaultHaDispatchFilter(DeploymentContext context, Service service, ResourceDescriptor resource) { + FilterDescriptor filter = addDispatchFilterForClass(context, service, resource, DEFAULT_HA_DISPATCH_CLASS); + filter.param().name(SERVICE_ROLE_PARAM).value(service.getRole()); + } + + private FilterDescriptor addDispatchFilterForClass(DeploymentContext context, Service service, ResourceDescriptor resource, String className) { FilterDescriptor filter = resource.addFilter().name(getName()).role(DISPATCH_ROLE).impl(GatewayDispatchFilter.class); filter.param().name(DISPATCH_IMPL_PARAM).value(className); FilterParamDescriptor filterParam = filter.param().name(REPLAY_BUFFER_SIZE_PARAM).value(DEFAULT_REPLAY_BUFFER_SIZE); @@ -212,6 +228,7 @@ public class ServiceDefinitionDeploymentContributor extends ServiceDeploymentCon //special case for hive filter.param().name("basicAuthPreemptive").value("true"); } + return filter; } private boolean isHaEnabled(DeploymentContext context) { http://git-wip-us.apache.org/repos/asf/knox/blob/0c1ff50f/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/i18n/WebHdfsMessages.java ---------------------------------------------------------------------- diff --git a/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/i18n/WebHdfsMessages.java b/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/i18n/WebHdfsMessages.java index b3671cb..e4479b5 100644 --- a/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/i18n/WebHdfsMessages.java +++ b/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/i18n/WebHdfsMessages.java @@ -17,41 +17,27 @@ */ package org.apache.hadoop.gateway.hdfs.i18n; +import org.apache.hadoop.gateway.ha.dispatch.i18n.HaDispatchMessages; import org.apache.hadoop.gateway.i18n.messages.Message; import org.apache.hadoop.gateway.i18n.messages.MessageLevel; import org.apache.hadoop.gateway.i18n.messages.Messages; import org.apache.hadoop.gateway.i18n.messages.StackTrace; @Messages(logger = "org.apache.hadoop.gateway") -public interface WebHdfsMessages { +public interface WebHdfsMessages extends HaDispatchMessages { - @Message(level = MessageLevel.INFO, text = "Initializing Ha Dispatch for: {0}") - void initializingForResourceRole(String resourceRole); - - @Message(level = MessageLevel.INFO, text = "Received an error from a node in Standby: {0}") + @Message(level = MessageLevel.INFO, text = "Received an error from a node in Standby: {0}") void errorReceivedFromStandbyNode(@StackTrace(level = MessageLevel.DEBUG) Exception e); - @Message(level = MessageLevel.INFO, text = "Could not connect to server: {0} {1}") - void errorConnectingToServer(String uri, @StackTrace(level = MessageLevel.DEBUG) Exception e); - - @Message(level = MessageLevel.INFO, text = "Received an error from a node in SafeMode: {0}") + @Message(level = MessageLevel.INFO, text = "Received an error from a node in SafeMode: {0}") void errorReceivedFromSafeModeNode(@StackTrace(level = MessageLevel.DEBUG) Exception e); - @Message(level = MessageLevel.INFO, text = "Failing over request to a different server: {0}") - void failingOverRequest(String uri); - - @Message(level = MessageLevel.INFO, text = "Retrying request to a server: {0}") + @Message(level = MessageLevel.INFO, text = "Retrying request to a server: {0}") void retryingRequest(String uri); - @Message(level = MessageLevel.INFO, text = "Maximum attempts {0} to failover reached for service: {1}") - void maxFailoverAttemptsReached(int attempts, String service); - - @Message(level = MessageLevel.INFO, text = "Maximum attempts {0} to retry reached for service: {1} at url : {2}") + @Message(level = MessageLevel.INFO, text = "Maximum attempts {0} to retry reached for service: {1} at url : {2}") void maxRetryAttemptsReached(int attempts, String service, String url); - @Message(level = MessageLevel.INFO, text = "Error occurred while trying to sleep for failover : {0} {1}") - void failoverSleepFailed(String service, @StackTrace(level = MessageLevel.DEBUG) Exception e); - - @Message(level = MessageLevel.INFO, text = "Error occurred while trying to sleep for retry : {0} {1}") + @Message(level = MessageLevel.INFO, text = "Error occurred while trying to sleep for retry : {0} {1}") void retrySleepFailed(String service, @StackTrace(level = MessageLevel.DEBUG) Exception e); }
