Repository: knox Updated Branches: refs/heads/master 375b624c4 -> 11796c96b
KNOX-1297 - HDFSUI Requires its own HA Dispatch Provider Project: http://git-wip-us.apache.org/repos/asf/knox/repo Commit: http://git-wip-us.apache.org/repos/asf/knox/commit/11796c96 Tree: http://git-wip-us.apache.org/repos/asf/knox/tree/11796c96 Diff: http://git-wip-us.apache.org/repos/asf/knox/diff/11796c96 Branch: refs/heads/master Commit: 11796c96b7a8d257bd0376b646c413792093efb4 Parents: 375b624 Author: Larry McCay <[email protected]> Authored: Sat May 5 11:25:25 2018 -0400 Committer: Larry McCay <[email protected]> Committed: Sat May 5 11:26:01 2018 -0400 ---------------------------------------------------------------------- .../resources/services/hdfsui/2.7.0/service.xml | 4 +- .../hdfs/dispatch/AbstractHdfsHaDispatch.java | 175 +++++++++++++++++++ .../gateway/hdfs/dispatch/HdfsUIHaDispatch.java | 35 ++++ .../hdfs/dispatch/WebHdfsHaDispatch.java | 161 +---------------- .../hdfs/dispatch/WebHdfsHaDispatchTest.java | 2 +- 5 files changed, 218 insertions(+), 159 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/knox/blob/11796c96/gateway-service-definitions/src/main/resources/services/hdfsui/2.7.0/service.xml ---------------------------------------------------------------------- diff --git a/gateway-service-definitions/src/main/resources/services/hdfsui/2.7.0/service.xml b/gateway-service-definitions/src/main/resources/services/hdfsui/2.7.0/service.xml index ac075eb..82c7426 100644 --- a/gateway-service-definitions/src/main/resources/services/hdfsui/2.7.0/service.xml +++ b/gateway-service-definitions/src/main/resources/services/hdfsui/2.7.0/service.xml @@ -32,8 +32,6 @@ <route path="/hdfs/dfshealth.html#tab-datanode"> <rewrite apply="HDFSUI/hdfs/outbound/namenode/relative" to="response.body"/> </route> - - </routes> - <dispatch classname="org.apache.knox.gateway.hdfs.dispatch.HdfsHttpClientDispatch" ha-classname="org.apache.knox.gateway.hdfs.dispatch.WebHdfsHaDispatch"/> + <dispatch classname="org.apache.knox.gateway.hdfs.dispatch.HdfsHttpClientDispatch" ha-classname="org.apache.knox.gateway.hdfs.dispatch.HdfsUIHaDispatch"/> </service> http://git-wip-us.apache.org/repos/asf/knox/blob/11796c96/gateway-service-webhdfs/src/main/java/org/apache/knox/gateway/hdfs/dispatch/AbstractHdfsHaDispatch.java ---------------------------------------------------------------------- diff --git a/gateway-service-webhdfs/src/main/java/org/apache/knox/gateway/hdfs/dispatch/AbstractHdfsHaDispatch.java b/gateway-service-webhdfs/src/main/java/org/apache/knox/gateway/hdfs/dispatch/AbstractHdfsHaDispatch.java new file mode 100644 index 0000000..1bd4136 --- /dev/null +++ b/gateway-service-webhdfs/src/main/java/org/apache/knox/gateway/hdfs/dispatch/AbstractHdfsHaDispatch.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.knox.gateway.hdfs.dispatch; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.URI; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpRequestBase; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.entity.BufferedHttpEntity; +import org.apache.knox.gateway.config.Configure; +import org.apache.knox.gateway.filter.AbstractGatewayFilter; +import org.apache.knox.gateway.ha.provider.HaProvider; +import org.apache.knox.gateway.ha.provider.HaServiceConfig; +import org.apache.knox.gateway.ha.provider.impl.HaServiceConfigConstants; +import org.apache.knox.gateway.hdfs.i18n.WebHdfsMessages; +import org.apache.knox.gateway.i18n.messages.MessagesFactory; + +public abstract class AbstractHdfsHaDispatch extends HdfsHttpClientDispatch { + + private static final String FAILOVER_COUNTER_ATTRIBUTE = "dispatch.ha.failover.counter"; + private static final String RETRY_COUNTER_ATTRIBUTE = "dispatch.ha.retry.counter"; + private static final WebHdfsMessages LOG = MessagesFactory.get(WebHdfsMessages.class); + private int maxFailoverAttempts = HaServiceConfigConstants.DEFAULT_MAX_FAILOVER_ATTEMPTS; + private int failoverSleep = HaServiceConfigConstants.DEFAULT_FAILOVER_SLEEP; + private int maxRetryAttempts = HaServiceConfigConstants.DEFAULT_MAX_RETRY_ATTEMPTS; + private int retrySleep = HaServiceConfigConstants.DEFAULT_RETRY_SLEEP; + private HaProvider haProvider; + + public AbstractHdfsHaDispatch() throws ServletException { + super(); + } + + @Override + public void init() { + super.init(); + if (haProvider != null) { + HaServiceConfig serviceConfig = haProvider.getHaDescriptor().getServiceConfig(getResourceRole()); + maxFailoverAttempts = serviceConfig.getMaxFailoverAttempts(); + failoverSleep = serviceConfig.getFailoverSleep(); + maxRetryAttempts = serviceConfig.getMaxRetryAttempts(); + retrySleep = serviceConfig.getRetrySleep(); + } + } + + public HaProvider getHaProvider() { + return haProvider; + } + + abstract String getResourceRole(); + + @Configure + public void setHaProvider(HaProvider haProvider) { + this.haProvider = haProvider; + } + + @Override + protected void executeRequest(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) throws IOException { + HttpResponse inboundResponse = null; + try { + inboundResponse = executeOutboundRequest(outboundRequest); + writeOutboundResponse(outboundRequest, inboundRequest, outboundResponse, inboundResponse); + } catch (StandbyException e) { + LOG.errorReceivedFromStandbyNode(e); + failoverRequest(outboundRequest, inboundRequest, outboundResponse, inboundResponse, e); + } catch (SafeModeException e) { + LOG.errorReceivedFromSafeModeNode(e); + retryRequest(outboundRequest, inboundRequest, outboundResponse, inboundResponse, e); + } catch (IOException e) { + LOG.errorConnectingToServer(outboundRequest.getURI().toString(), e); + failoverRequest(outboundRequest, inboundRequest, outboundResponse, inboundResponse, e); + } + } + + /** + * Checks for specific outbound response codes/content to trigger a retry or failover + */ + @Override + protected void writeOutboundResponse(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse, HttpResponse inboundResponse) throws IOException { + if (inboundResponse.getStatusLine().getStatusCode() == 403) { + BufferedHttpEntity entity = new BufferedHttpEntity(inboundResponse.getEntity()); + inboundResponse.setEntity(entity); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + inboundResponse.getEntity().writeTo(outputStream); + String body = new String(outputStream.toByteArray()); + if (body.contains("StandbyException")) { + throw new StandbyException(); + } + if (body.contains("SafeModeException") || body.contains("RetriableException")) { + throw new SafeModeException(); + } + } + super.writeOutboundResponse(outboundRequest, inboundRequest, outboundResponse, inboundResponse); + } + + private void failoverRequest(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse, HttpResponse inboundResponse, Exception exception) throws IOException { + LOG.failingOverRequest(outboundRequest.getURI().toString()); + AtomicInteger counter = (AtomicInteger) inboundRequest.getAttribute(FAILOVER_COUNTER_ATTRIBUTE); + if (counter == null) { + counter = new AtomicInteger(0); + } + inboundRequest.setAttribute(FAILOVER_COUNTER_ATTRIBUTE, counter); + if (counter.incrementAndGet() <= maxFailoverAttempts) { + haProvider.markFailedURL(getResourceRole(), outboundRequest.getURI().toString()); + //null out target url so that rewriters run again + inboundRequest.setAttribute(AbstractGatewayFilter.TARGET_REQUEST_URL_ATTRIBUTE_NAME, null); + URI uri = getDispatchUrl(inboundRequest); + ((HttpRequestBase) outboundRequest).setURI(uri); + if (failoverSleep > 0) { + try { + Thread.sleep(failoverSleep); + } catch (InterruptedException e) { + LOG.failoverSleepFailed(getResourceRole(), e); + } + } + executeRequest(outboundRequest, inboundRequest, outboundResponse); + } else { + LOG.maxFailoverAttemptsReached(maxFailoverAttempts, getResourceRole()); + if (inboundResponse != null) { + writeOutboundResponse(outboundRequest, inboundRequest, outboundResponse, inboundResponse); + } else { + throw new IOException(exception); + } + } + } + + private void retryRequest(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse, HttpResponse inboundResponse, Exception exception) throws IOException { + LOG.retryingRequest(outboundRequest.getURI().toString()); + AtomicInteger counter = (AtomicInteger) inboundRequest.getAttribute(RETRY_COUNTER_ATTRIBUTE); + if (counter == null) { + counter = new AtomicInteger(0); + } + inboundRequest.setAttribute(RETRY_COUNTER_ATTRIBUTE, counter); + if (counter.incrementAndGet() <= maxRetryAttempts) { + if (retrySleep > 0) { + try { + Thread.sleep(retrySleep); + } catch (InterruptedException e) { + LOG.retrySleepFailed(getResourceRole(), e); + } + } + executeRequest(outboundRequest, inboundRequest, outboundResponse); + } else { + LOG.maxRetryAttemptsReached(maxRetryAttempts, getResourceRole(), outboundRequest.getURI().toString()); + if (inboundResponse != null) { + writeOutboundResponse(outboundRequest, inboundRequest, outboundResponse, inboundResponse); + } else { + throw new IOException(exception); + } + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/knox/blob/11796c96/gateway-service-webhdfs/src/main/java/org/apache/knox/gateway/hdfs/dispatch/HdfsUIHaDispatch.java ---------------------------------------------------------------------- diff --git a/gateway-service-webhdfs/src/main/java/org/apache/knox/gateway/hdfs/dispatch/HdfsUIHaDispatch.java b/gateway-service-webhdfs/src/main/java/org/apache/knox/gateway/hdfs/dispatch/HdfsUIHaDispatch.java new file mode 100644 index 0000000..7492402 --- /dev/null +++ b/gateway-service-webhdfs/src/main/java/org/apache/knox/gateway/hdfs/dispatch/HdfsUIHaDispatch.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.knox.gateway.hdfs.dispatch; + +import javax.servlet.ServletException; + +public class HdfsUIHaDispatch extends AbstractHdfsHaDispatch { + public static final String RESOURCE_ROLE = "HDFSUI"; + + /** + * @throws javax.servlet.ServletException + */ + public HdfsUIHaDispatch() throws ServletException { + super(); + } + + protected String getResourceRole() { + return RESOURCE_ROLE; + } +} http://git-wip-us.apache.org/repos/asf/knox/blob/11796c96/gateway-service-webhdfs/src/main/java/org/apache/knox/gateway/hdfs/dispatch/WebHdfsHaDispatch.java ---------------------------------------------------------------------- diff --git a/gateway-service-webhdfs/src/main/java/org/apache/knox/gateway/hdfs/dispatch/WebHdfsHaDispatch.java b/gateway-service-webhdfs/src/main/java/org/apache/knox/gateway/hdfs/dispatch/WebHdfsHaDispatch.java index c8198fa..1a1a52d 100644 --- a/gateway-service-webhdfs/src/main/java/org/apache/knox/gateway/hdfs/dispatch/WebHdfsHaDispatch.java +++ b/gateway-service-webhdfs/src/main/java/org/apache/knox/gateway/hdfs/dispatch/WebHdfsHaDispatch.java @@ -17,168 +17,19 @@ */ package org.apache.knox.gateway.hdfs.dispatch; -import org.apache.knox.gateway.config.Configure; -import org.apache.knox.gateway.filter.AbstractGatewayFilter; -import org.apache.knox.gateway.ha.provider.HaProvider; -import org.apache.knox.gateway.ha.provider.HaServiceConfig; -import org.apache.knox.gateway.ha.provider.impl.HaServiceConfigConstants; -import org.apache.knox.gateway.hdfs.i18n.WebHdfsMessages; -import org.apache.knox.gateway.i18n.messages.MessagesFactory; -import org.apache.http.HttpResponse; -import org.apache.http.client.methods.HttpRequestBase; -import org.apache.http.client.methods.HttpUriRequest; -import org.apache.http.entity.BufferedHttpEntity; - import javax.servlet.ServletException; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.net.URI; -import java.util.concurrent.atomic.AtomicInteger; - -public class WebHdfsHaDispatch extends HdfsHttpClientDispatch { - - private static final String FAILOVER_COUNTER_ATTRIBUTE = "dispatch.ha.failover.counter"; - - private static final String RETRY_COUNTER_ATTRIBUTE = "dispatch.ha.retry.counter"; - - public static final String RESOURCE_ROLE = "WEBHDFS"; - - private static final WebHdfsMessages LOG = MessagesFactory.get(WebHdfsMessages.class); - - private int maxFailoverAttempts = HaServiceConfigConstants.DEFAULT_MAX_FAILOVER_ATTEMPTS; - - private int failoverSleep = HaServiceConfigConstants.DEFAULT_FAILOVER_SLEEP; - - private int maxRetryAttempts = HaServiceConfigConstants.DEFAULT_MAX_RETRY_ATTEMPTS; - - private int retrySleep = HaServiceConfigConstants.DEFAULT_RETRY_SLEEP; - private HaProvider haProvider; +public class WebHdfsHaDispatch extends AbstractHdfsHaDispatch { + public static final String RESOURCE_ROLE = "WEBHDFS"; - /** + /** * @throws javax.servlet.ServletException */ public WebHdfsHaDispatch() throws ServletException { super(); } - - @Override - public void init() { - super.init(); - if (haProvider != null) { - HaServiceConfig serviceConfig = haProvider.getHaDescriptor().getServiceConfig(RESOURCE_ROLE); - maxFailoverAttempts = serviceConfig.getMaxFailoverAttempts(); - failoverSleep = serviceConfig.getFailoverSleep(); - maxRetryAttempts = serviceConfig.getMaxRetryAttempts(); - retrySleep = serviceConfig.getRetrySleep(); - } - } - - public HaProvider getHaProvider() { - return haProvider; + + protected String getResourceRole() { + return RESOURCE_ROLE; } - - @Configure - public void setHaProvider(HaProvider haProvider) { - this.haProvider = haProvider; - } - - @Override - protected void executeRequest(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) throws IOException { - HttpResponse inboundResponse = null; - try { - inboundResponse = executeOutboundRequest(outboundRequest); - writeOutboundResponse(outboundRequest, inboundRequest, outboundResponse, inboundResponse); - } catch (StandbyException e) { - LOG.errorReceivedFromStandbyNode(e); - failoverRequest(outboundRequest, inboundRequest, outboundResponse, inboundResponse, e); - } catch (SafeModeException e) { - LOG.errorReceivedFromSafeModeNode(e); - retryRequest(outboundRequest, inboundRequest, outboundResponse, inboundResponse, e); - } catch (IOException e) { - LOG.errorConnectingToServer(outboundRequest.getURI().toString(), e); - failoverRequest(outboundRequest, inboundRequest, outboundResponse, inboundResponse, e); - } - } - - /** - * Checks for specific outbound response codes/content to trigger a retry or failover - */ - @Override - protected void writeOutboundResponse(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse, HttpResponse inboundResponse) throws IOException { - if (inboundResponse.getStatusLine().getStatusCode() == 403) { - BufferedHttpEntity entity = new BufferedHttpEntity(inboundResponse.getEntity()); - inboundResponse.setEntity(entity); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - inboundResponse.getEntity().writeTo(outputStream); - String body = new String(outputStream.toByteArray()); - if (body.contains("StandbyException")) { - throw new StandbyException(); - } - if (body.contains("SafeModeException") || body.contains("RetriableException")) { - throw new SafeModeException(); - } - } - super.writeOutboundResponse(outboundRequest, inboundRequest, outboundResponse, inboundResponse); - } - - private void failoverRequest(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse, HttpResponse inboundResponse, Exception exception) throws IOException { - LOG.failingOverRequest(outboundRequest.getURI().toString()); - AtomicInteger counter = (AtomicInteger) inboundRequest.getAttribute(FAILOVER_COUNTER_ATTRIBUTE); - if (counter == null) { - counter = new AtomicInteger(0); - } - inboundRequest.setAttribute(FAILOVER_COUNTER_ATTRIBUTE, counter); - if (counter.incrementAndGet() <= maxFailoverAttempts) { - haProvider.markFailedURL(RESOURCE_ROLE, outboundRequest.getURI().toString()); - //null out target url so that rewriters run again - inboundRequest.setAttribute(AbstractGatewayFilter.TARGET_REQUEST_URL_ATTRIBUTE_NAME, null); - URI uri = getDispatchUrl(inboundRequest); - ((HttpRequestBase) outboundRequest).setURI(uri); - if (failoverSleep > 0) { - try { - Thread.sleep(failoverSleep); - } catch (InterruptedException e) { - LOG.failoverSleepFailed(RESOURCE_ROLE, e); - } - } - executeRequest(outboundRequest, inboundRequest, outboundResponse); - } else { - LOG.maxFailoverAttemptsReached(maxFailoverAttempts, RESOURCE_ROLE); - if (inboundResponse != null) { - writeOutboundResponse(outboundRequest, inboundRequest, outboundResponse, inboundResponse); - } else { - throw new IOException(exception); - } - } - } - - private void retryRequest(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse, HttpResponse inboundResponse, Exception exception) throws IOException { - LOG.retryingRequest(outboundRequest.getURI().toString()); - AtomicInteger counter = (AtomicInteger) inboundRequest.getAttribute(RETRY_COUNTER_ATTRIBUTE); - if (counter == null) { - counter = new AtomicInteger(0); - } - inboundRequest.setAttribute(RETRY_COUNTER_ATTRIBUTE, counter); - if (counter.incrementAndGet() <= maxRetryAttempts) { - if (retrySleep > 0) { - try { - Thread.sleep(retrySleep); - } catch (InterruptedException e) { - LOG.retrySleepFailed(RESOURCE_ROLE, e); - } - } - executeRequest(outboundRequest, inboundRequest, outboundResponse); - } else { - LOG.maxRetryAttemptsReached(maxRetryAttempts, RESOURCE_ROLE, outboundRequest.getURI().toString()); - if (inboundResponse != null) { - writeOutboundResponse(outboundRequest, inboundRequest, outboundResponse, inboundResponse); - } else { - throw new IOException(exception); - } - } - } - } http://git-wip-us.apache.org/repos/asf/knox/blob/11796c96/gateway-service-webhdfs/src/test/java/org/apache/knox/gateway/hdfs/dispatch/WebHdfsHaDispatchTest.java ---------------------------------------------------------------------- diff --git a/gateway-service-webhdfs/src/test/java/org/apache/knox/gateway/hdfs/dispatch/WebHdfsHaDispatchTest.java b/gateway-service-webhdfs/src/test/java/org/apache/knox/gateway/hdfs/dispatch/WebHdfsHaDispatchTest.java index 6b7260b..40fd9ed 100644 --- a/gateway-service-webhdfs/src/test/java/org/apache/knox/gateway/hdfs/dispatch/WebHdfsHaDispatchTest.java +++ b/gateway-service-webhdfs/src/test/java/org/apache/knox/gateway/hdfs/dispatch/WebHdfsHaDispatchTest.java @@ -88,7 +88,7 @@ public class WebHdfsHaDispatchTest { }).once(); EasyMock.replay(filterConfig, servletContext, outboundRequest, inboundRequest, outboundResponse); Assert.assertEquals(uri1.toString(), provider.getActiveURL(serviceName)); - WebHdfsHaDispatch dispatch = new WebHdfsHaDispatch(); + AbstractHdfsHaDispatch dispatch = new WebHdfsHaDispatch(); HttpClientBuilder builder = HttpClientBuilder.create(); CloseableHttpClient client = builder.build(); dispatch.setHttpClient(client);
