This is an automated email from the ASF dual-hosted git repository. zregvart pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 78f0c8f CAMEL-13963: POST support in Netty-HTTP proxy mode 78f0c8f is described below commit 78f0c8fd84096436e337f5c4952f962c82a3df6f Author: Zoran Regvart <zregv...@apache.org> AuthorDate: Tue Sep 10 16:23:25 2019 +0200 CAMEL-13963: POST support in Netty-HTTP proxy mode --- .../netty/http/DefaultNettyHttpBinding.java | 13 +- .../component/netty/http/ProxyProtocolTest.java | 153 ++++++++++++++++++--- 2 files changed, 147 insertions(+), 19 deletions(-) diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java index c20f6b3..2ef3db5 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java @@ -195,8 +195,14 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable { } } - // add uri parameters as headers to the Camel message - if (request.uri().contains("?")) { + // add uri parameters as headers to the Camel message; + // when acting as a HTTP proxy we don't want to place query + // parameters in Camel message headers as the query parameters + // will be passed via Exchange.HTTP_QUERY, otherwise we could have + // both the Exchange.HTTP_QUERY and the values from the message + // headers, so we end up with two values for the same query + // parameter + if (!configuration.isHttpProxy() && request.uri().contains("?")) { String query = StringHelper.after(request.uri(), "?"); Map<String, Object> uriParameters = URISupport.parseQuery(query, false, true); @@ -218,9 +224,10 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable { // if body is application/x-www-form-urlencoded then extract the body as query string and append as headers // if it is a bridgeEndpoint we need to skip this part of work + // if we're proxying the body is a buffer that we do not want to consume directly if (request.method().name().equals("POST") && request.headers().get(Exchange.CONTENT_TYPE) != null && request.headers().get(Exchange.CONTENT_TYPE).startsWith(NettyHttpConstants.CONTENT_TYPE_WWW_FORM_URLENCODED) - && !configuration.isBridgeEndpoint()) { + && !configuration.isBridgeEndpoint() && !configuration.isHttpProxy()) { String charset = "UTF-8"; diff --git a/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/ProxyProtocolTest.java b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/ProxyProtocolTest.java index bd9841c..759e6f3 100644 --- a/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/ProxyProtocolTest.java +++ b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/ProxyProtocolTest.java @@ -16,45 +16,65 @@ */ package org.apache.camel.component.netty.http; +import java.io.IOException; import java.io.InputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; import java.net.HttpURLConnection; import java.net.InetSocketAddress; +import java.net.MalformedURLException; import java.net.Proxy; import java.net.URL; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Locale; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import org.apache.camel.Exchange; +import org.apache.camel.Message; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.model.RouteDefinition; +import org.apache.camel.spi.ShutdownStrategy; import org.apache.camel.test.AvailablePortFinder; +import org.apache.camel.util.ObjectHelper; import org.apache.commons.io.IOUtils; import org.junit.After; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import static org.assertj.core.api.Assertions.assertThat; +@RunWith(Parameterized.class) public class ProxyProtocolTest { - private final DefaultCamelContext context = new DefaultCamelContext(); + private static final int ORIGIN_PORT = AvailablePortFinder.getNextAvailable(); - private final int port = AvailablePortFinder.getNextAvailable(); + private static final int PROXY_PORT = AvailablePortFinder.getNextAvailable(); + + private final DefaultCamelContext context; + + private final String url; + + public ProxyProtocolTest(final Function<RouteBuilder, RouteDefinition> variant, final String url) throws Exception { + this.url = url; + context = new DefaultCamelContext(); - public ProxyProtocolTest() throws Exception { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { - final int originPort = AvailablePortFinder.getNextAvailable(); - // proxy from http://localhost:port to + // route variation that proxies from http://localhost:port to // http://localhost:originPort/path - from("netty-http:proxy://localhost:" + port) - .to("netty-http:http://localhost:" + originPort) - .process(e -> e.getMessage().setBody(e.getMessage().getBody(String.class).toUpperCase(Locale.US))); + variant.apply(this); // origin service that serves `"origin server"` on // http://localhost:originPort/path - from("netty-http:http://localhost:" + originPort + "/path").setBody() - .constant("origin server"); + from("netty-http:http://localhost:" + ORIGIN_PORT + "/path") + .process(ProxyProtocolTest::origin); } }); context.start(); @@ -69,19 +89,120 @@ public class ProxyProtocolTest { @Test public void shouldServeAsHttpProxy() throws Exception { - final Proxy proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress("localhost", port)); - // request for http://test/path will be proxied by http://localhost:port // and diverted to http://localhost:originPort/path - final HttpURLConnection connection = (HttpURLConnection) new URL("http://test/path").openConnection(proxy); - - try (InputStream stream = connection.getInputStream()) { + try (InputStream stream = request(url)) { assertThat(IOUtils.readLines(stream, StandardCharsets.UTF_8)).containsOnly("ORIGIN SERVER"); } } + @Test + public void shouldSupportPostingFormEncodedPayloads() throws Exception { + try (InputStream stream = request(url, "hello=world", NettyHttpConstants.CONTENT_TYPE_WWW_FORM_URLENCODED)) { + assertThat(IOUtils.readLines(stream, StandardCharsets.UTF_8)).containsOnly("ORIGIN SERVER: HELLO=WORLD"); + } + } + + @Test + public void shouldSupportPostingPlaintextPayloads() throws Exception { + try (InputStream stream = request(url, "hello", "text/plain")) { + assertThat(IOUtils.readLines(stream, StandardCharsets.UTF_8)).containsOnly("ORIGIN SERVER: HELLO"); + } + } + + @Test + public void shouldSupportQueryParameters() throws Exception { + // request for http://test/path?q=... will be proxied by + // http://localhost:port + // and diverted to http://localhost:originPort/path?q=... + try (InputStream stream = request(url + "?q=hello")) { + assertThat(IOUtils.readLines(stream, StandardCharsets.UTF_8)).containsOnly("ORIGIN SERVER: HELLO"); + } + } + @After public void shutdownCamel() throws Exception { - context.stop(); + final ShutdownStrategy shutdownStrategy = context.getShutdownStrategy(); + shutdownStrategy.setTimeout(100); + shutdownStrategy.setTimeUnit(TimeUnit.MILLISECONDS); + shutdownStrategy.shutdownForced(context, context.getRouteStartupOrder()); + } + + @Parameters + public static Iterable<Object[]> routeOptions() { + final Function<RouteBuilder, RouteDefinition> single = r -> r.from("netty-http:proxy://localhost:" + PROXY_PORT) + .to("netty-http:http://localhost:" + ORIGIN_PORT) + .process(ProxyProtocolTest::uppercase); + + final Function<RouteBuilder, RouteDefinition> dynamicPath = r -> r.from("netty-http:proxy://localhost:" + PROXY_PORT) + .toD("netty-http:http://localhost:" + ORIGIN_PORT + "/${headers." + Exchange.HTTP_PATH + "}") + .process(ProxyProtocolTest::uppercase); + + final Function<RouteBuilder, RouteDefinition> dynamicUrl = r -> r.from("netty-http:proxy://localhost:" + PROXY_PORT) + .toD("netty-http:" + + "${headers." + Exchange.HTTP_SCHEME + "}://" + + "${headers." + Exchange.HTTP_HOST + "}:" + + "${headers." + Exchange.HTTP_PORT + "}/" + + "${headers." + Exchange.HTTP_PATH + "}") + .process(ProxyProtocolTest::uppercase); + + return Arrays.asList( + new Object[] {single, "http://test/path"}, + new Object[] {dynamicPath, "http://test/path"}, + new Object[] {dynamicUrl, "http://localhost:" + ORIGIN_PORT + "/path"}); + } + + private static void origin(final Exchange exchange) { + final Message message = exchange.getMessage(); + + final String q = message.getHeader("q", String.class); + final String body = message.getBody(String.class); + + if (ObjectHelper.isEmpty(q) && ObjectHelper.isEmpty(body)) { + message.setBody("origin server"); + } else if (ObjectHelper.isEmpty(body)) { + message.setBody("origin server: " + q); + } else { + message.setBody("origin server: " + body); + } + } + + private static InputStream request(final String url) throws IOException, MalformedURLException { + final Proxy proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress("localhost", PROXY_PORT)); + + final HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection(proxy); + // when debugging comment out the following two lines otherwise + // the test will terminate regardless of the execution being + // paused at the breakpoint + connection.setConnectTimeout(1000); + connection.setReadTimeout(1000); + + return connection.getInputStream(); + } + + private static InputStream request(final String url, final String payload, final String contentType) throws IOException, MalformedURLException { + final Proxy proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress("localhost", PROXY_PORT)); + + final HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection(proxy); + connection.addRequestProperty("Content-Type", contentType); + connection.setDoOutput(true); + // when debugging comment out the following two lines otherwise + // the test will terminate regardless of the execution being + // paused at the breakpoint + connection.setConnectTimeout(1000); + connection.setReadTimeout(1000); + + try (Writer writer = new OutputStreamWriter(connection.getOutputStream(), StandardCharsets.US_ASCII)) { + writer.write(payload); + } + + return connection.getInputStream(); + } + + private static void uppercase(final Exchange exchange) { + final Message message = exchange.getMessage(); + final String body = message.getBody(String.class); + + message.setBody(body.toUpperCase(Locale.US)); } }