http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java index 2bceda6..9e31a4e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java @@ -1,180 +1,180 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.standard; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.servlet.AsyncContext; -import javax.servlet.ServletOutputStream; -import javax.servlet.WriteListener; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; - -import org.apache.nifi.controller.AbstractControllerService; -import org.apache.nifi.http.HttpContextMap; -import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.Assert; -import org.junit.Test; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -public class TestHandleHttpResponse { - - @Test - public void testEnsureCompleted() throws InitializationException { - final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class); - - final MockHttpContextMap contextMap = new MockHttpContextMap("my-id"); - runner.addControllerService("http-context-map", contextMap); - runner.enableControllerService(contextMap); - runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, "http-context-map"); - runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}"); - runner.setProperty("my-attr", "${my-attr}"); - runner.setProperty("no-valid-attr", "${no-valid-attr}"); - - final Map<String, String> attributes = new HashMap<>(); - attributes.put(HandleHttpResponse.HTTP_CONTEXT_ID, "my-id"); - attributes.put("my-attr", "hello"); - attributes.put("status.code", "201"); - - runner.enqueue("hello".getBytes(), attributes); - - runner.run(); - - runner.assertAllFlowFilesTransferred(HandleHttpResponse.REL_SUCCESS, 1); - - assertEquals("hello", contextMap.baos.toString()); - assertEquals("hello", contextMap.headersSent.get("my-attr")); - assertNull(contextMap.headersSent.get("no-valid-attr")); - assertEquals(201, contextMap.statusCode); - assertEquals(1, contextMap.getCompletionCount()); - assertTrue(contextMap.headersWithNoValue.isEmpty()); - } - - private static class MockHttpContextMap extends AbstractControllerService implements HttpContextMap { - - private final String id; - private final AtomicInteger completedCount = new AtomicInteger(0); - private final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - private final ConcurrentMap<String, String> headersSent = new ConcurrentHashMap<>(); - private volatile int statusCode = -1; - - private final List<String> headersWithNoValue = new CopyOnWriteArrayList<>(); - - public MockHttpContextMap(final String expectedIdentifier) { - this.id = expectedIdentifier; - } - - @Override - public boolean register(String identifier, HttpServletRequest request, HttpServletResponse response, AsyncContext context) { - return true; - } - - @Override - public HttpServletResponse getResponse(final String identifier) { - if (!id.equals(identifier)) { - Assert.fail("attempting to respond to wrong request; should have been " + id + " but was " + identifier); - } - - try { - final HttpServletResponse response = Mockito.mock(HttpServletResponse.class); - Mockito.when(response.getOutputStream()).thenReturn(new ServletOutputStream() { - @Override - public boolean isReady() { - return true; - } - - @Override - public void setWriteListener(WriteListener writeListener) { - } - - @Override - public void write(int b) throws IOException { - baos.write(b); - } - - @Override - public void write(byte[] b) throws IOException { - baos.write(b); - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - baos.write(b, off, len); - } - }); - - Mockito.doAnswer(new Answer<Object>() { - @Override - public Object answer(final InvocationOnMock invocation) throws Throwable { - final String key = invocation.getArgumentAt(0, String.class); - final String value = invocation.getArgumentAt(1, String.class); - if (value == null) { - headersWithNoValue.add(key); - } else { - headersSent.put(key, value); - } - - return null; - } - }).when(response).setHeader(Mockito.any(String.class), Mockito.any(String.class)); - - Mockito.doAnswer(new Answer<Object>() { - @Override - public Object answer(final InvocationOnMock invocation) throws Throwable { - statusCode = invocation.getArgumentAt(0, int.class); - return null; - } - }).when(response).setStatus(Mockito.anyInt()); - - return response; - } catch (final Exception e) { - e.printStackTrace(); - Assert.fail(e.toString()); - return null; - } - } - - @Override - public void complete(final String identifier) { - if (!id.equals(identifier)) { - Assert.fail("attempting to respond to wrong request; should have been " + id + " but was " + identifier); - } - - completedCount.incrementAndGet(); - } - - public int getCompletionCount() { - return completedCount.get(); - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.servlet.AsyncContext; +import javax.servlet.ServletOutputStream; +import javax.servlet.WriteListener; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.http.HttpContextMap; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class TestHandleHttpResponse { + + @Test + public void testEnsureCompleted() throws InitializationException { + final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class); + + final MockHttpContextMap contextMap = new MockHttpContextMap("my-id"); + runner.addControllerService("http-context-map", contextMap); + runner.enableControllerService(contextMap); + runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, "http-context-map"); + runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}"); + runner.setProperty("my-attr", "${my-attr}"); + runner.setProperty("no-valid-attr", "${no-valid-attr}"); + + final Map<String, String> attributes = new HashMap<>(); + attributes.put(HandleHttpResponse.HTTP_CONTEXT_ID, "my-id"); + attributes.put("my-attr", "hello"); + attributes.put("status.code", "201"); + + runner.enqueue("hello".getBytes(), attributes); + + runner.run(); + + runner.assertAllFlowFilesTransferred(HandleHttpResponse.REL_SUCCESS, 1); + + assertEquals("hello", contextMap.baos.toString()); + assertEquals("hello", contextMap.headersSent.get("my-attr")); + assertNull(contextMap.headersSent.get("no-valid-attr")); + assertEquals(201, contextMap.statusCode); + assertEquals(1, contextMap.getCompletionCount()); + assertTrue(contextMap.headersWithNoValue.isEmpty()); + } + + private static class MockHttpContextMap extends AbstractControllerService implements HttpContextMap { + + private final String id; + private final AtomicInteger completedCount = new AtomicInteger(0); + private final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + private final ConcurrentMap<String, String> headersSent = new ConcurrentHashMap<>(); + private volatile int statusCode = -1; + + private final List<String> headersWithNoValue = new CopyOnWriteArrayList<>(); + + public MockHttpContextMap(final String expectedIdentifier) { + this.id = expectedIdentifier; + } + + @Override + public boolean register(String identifier, HttpServletRequest request, HttpServletResponse response, AsyncContext context) { + return true; + } + + @Override + public HttpServletResponse getResponse(final String identifier) { + if (!id.equals(identifier)) { + Assert.fail("attempting to respond to wrong request; should have been " + id + " but was " + identifier); + } + + try { + final HttpServletResponse response = Mockito.mock(HttpServletResponse.class); + Mockito.when(response.getOutputStream()).thenReturn(new ServletOutputStream() { + @Override + public boolean isReady() { + return true; + } + + @Override + public void setWriteListener(WriteListener writeListener) { + } + + @Override + public void write(int b) throws IOException { + baos.write(b); + } + + @Override + public void write(byte[] b) throws IOException { + baos.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + baos.write(b, off, len); + } + }); + + Mockito.doAnswer(new Answer<Object>() { + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable { + final String key = invocation.getArgumentAt(0, String.class); + final String value = invocation.getArgumentAt(1, String.class); + if (value == null) { + headersWithNoValue.add(key); + } else { + headersSent.put(key, value); + } + + return null; + } + }).when(response).setHeader(Mockito.any(String.class), Mockito.any(String.class)); + + Mockito.doAnswer(new Answer<Object>() { + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable { + statusCode = invocation.getArgumentAt(0, int.class); + return null; + } + }).when(response).setStatus(Mockito.anyInt()); + + return response; + } catch (final Exception e) { + e.printStackTrace(); + Assert.fail(e.toString()); + return null; + } + } + + @Override + public void complete(final String identifier) { + if (!id.equals(identifier)) { + Assert.fail("attempting to respond to wrong request; should have been " + id + " but was " + identifier); + } + + completedCount.incrementAndGet(); + } + + public int getCompletionCount() { + return completedCount.get(); + } + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java index bd35868..060148e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java @@ -1,248 +1,248 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.standard; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.util.HashMap; -import java.util.Map; - -import org.apache.nifi.ssl.SSLContextService; -import org.apache.nifi.ssl.StandardSSLContextService; -import org.apache.nifi.util.FlowFileUnpackagerV3; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.eclipse.jetty.servlet.ServletHandler; -import org.junit.After; -import org.junit.Test; - -public class TestPostHTTP { - - private TestServer server; - private TestRunner runner; - private CaptureServlet servlet; - - private void setup(final Map<String, String> sslProperties) throws Exception { - // set up web service - ServletHandler handler = new ServletHandler(); - handler.addServletWithMapping(CaptureServlet.class, "/*"); - servlet = (CaptureServlet) handler.getServlets()[0].getServlet(); - - // create the service - server = new TestServer(sslProperties); - server.addHandler(handler); - server.startServer(); - - runner = TestRunners.newTestRunner(PostHTTP.class); - } - - @After - public void cleanup() throws Exception { - if (server != null) { - server.shutdownServer(); - server = null; - } - } - - @Test - public void testTruststoreSSLOnly() throws Exception { - final Map<String, String> sslProps = new HashMap<>(); - sslProps.put(TestServer.NEED_CLIENT_AUTH, "false"); - sslProps.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks"); - sslProps.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest"); - sslProps.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS"); - setup(sslProps); - - final SSLContextService sslContextService = new StandardSSLContextService(); - runner.addControllerService("ssl-context", sslContextService); - runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks"); - runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest"); - runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_TYPE, "JKS"); - runner.enableControllerService(sslContextService); - - runner.setProperty(PostHTTP.URL, server.getSecureUrl()); - runner.setProperty(PostHTTP.SSL_CONTEXT_SERVICE, "ssl-context"); - - runner.enqueue("Hello world".getBytes()); - runner.run(); - - runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS, 1); - } - - @Test - public void testTwoWaySSL() throws Exception { - final Map<String, String> sslProps = new HashMap<>(); - sslProps.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks"); - sslProps.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest"); - sslProps.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS"); - sslProps.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks"); - sslProps.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "localtest"); - sslProps.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS"); - sslProps.put(TestServer.NEED_CLIENT_AUTH, "true"); - setup(sslProps); - - final SSLContextService sslContextService = new StandardSSLContextService(); - runner.addControllerService("ssl-context", sslContextService); - runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks"); - runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest"); - runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_TYPE, "JKS"); - runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE, "src/test/resources/localhost-ks.jks"); - runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_PASSWORD, "localtest"); - runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_TYPE, "JKS"); - runner.enableControllerService(sslContextService); - - runner.setProperty(PostHTTP.URL, server.getSecureUrl()); - runner.setProperty(PostHTTP.SSL_CONTEXT_SERVICE, "ssl-context"); - - runner.enqueue("Hello world".getBytes()); - runner.run(); - - runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS, 1); - } - - @Test - public void testOneWaySSLWhenServerConfiguredForTwoWay() throws Exception { - final Map<String, String> sslProps = new HashMap<>(); - sslProps.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks"); - sslProps.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest"); - sslProps.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS"); - sslProps.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks"); - sslProps.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "localtest"); - sslProps.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS"); - sslProps.put(TestServer.NEED_CLIENT_AUTH, "true"); - setup(sslProps); - - final SSLContextService sslContextService = new StandardSSLContextService(); - runner.addControllerService("ssl-context", sslContextService); - runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks"); - runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest"); - runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_TYPE, "JKS"); - runner.enableControllerService(sslContextService); - - runner.setProperty(PostHTTP.URL, server.getSecureUrl()); - runner.setProperty(PostHTTP.SSL_CONTEXT_SERVICE, "ssl-context"); - - runner.enqueue("Hello world".getBytes()); - runner.run(); - - runner.assertAllFlowFilesTransferred(PostHTTP.REL_FAILURE, 1); - } - - @Test - public void testSendAsFlowFile() throws Exception { - setup(null); - runner.setProperty(PostHTTP.URL, server.getUrl()); - runner.setProperty(PostHTTP.SEND_AS_FLOWFILE, "true"); - - final Map<String, String> attrs = new HashMap<>(); - attrs.put("abc", "cba"); - - runner.enqueue("Hello".getBytes(), attrs); - attrs.put("abc", "abc"); - attrs.put("filename", "xyz.txt"); - runner.enqueue("World".getBytes(), attrs); - - runner.run(1); - runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS); - - final byte[] lastPost = servlet.getLastPost(); - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - final ByteArrayInputStream bais = new ByteArrayInputStream(lastPost); - - FlowFileUnpackagerV3 unpacker = new FlowFileUnpackagerV3(); - - // unpack first flowfile received - Map<String, String> receivedAttrs = unpacker.unpackageFlowFile(bais, baos); - byte[] contentReceived = baos.toByteArray(); - assertEquals("Hello", new String(contentReceived)); - assertEquals("cba", receivedAttrs.get("abc")); - - assertTrue(unpacker.hasMoreData()); - - baos.reset(); - receivedAttrs = unpacker.unpackageFlowFile(bais, baos); - contentReceived = baos.toByteArray(); - - assertEquals("World", new String(contentReceived)); - assertEquals("abc", receivedAttrs.get("abc")); - assertEquals("xyz.txt", receivedAttrs.get("filename")); - } - - @Test - public void testSendAsFlowFileSecure() throws Exception { - final Map<String, String> sslProps = new HashMap<>(); - sslProps.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks"); - sslProps.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest"); - sslProps.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS"); - sslProps.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks"); - sslProps.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "localtest"); - sslProps.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS"); - sslProps.put(TestServer.NEED_CLIENT_AUTH, "true"); - setup(sslProps); - - final SSLContextService sslContextService = new StandardSSLContextService(); - runner.addControllerService("ssl-context", sslContextService); - runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks"); - runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest"); - runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_TYPE, "JKS"); - runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE, "src/test/resources/localhost-ks.jks"); - runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_PASSWORD, "localtest"); - runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_TYPE, "JKS"); - runner.enableControllerService(sslContextService); - - runner.setProperty(PostHTTP.URL, server.getSecureUrl()); - runner.setProperty(PostHTTP.SEND_AS_FLOWFILE, "true"); - runner.setProperty(PostHTTP.SSL_CONTEXT_SERVICE, "ssl-context"); - - final Map<String, String> attrs = new HashMap<>(); - attrs.put("abc", "cba"); - - runner.enqueue("Hello".getBytes(), attrs); - attrs.put("abc", "abc"); - attrs.put("filename", "xyz.txt"); - runner.enqueue("World".getBytes(), attrs); - - runner.run(1); - runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS); - - final byte[] lastPost = servlet.getLastPost(); - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - final ByteArrayInputStream bais = new ByteArrayInputStream(lastPost); - - FlowFileUnpackagerV3 unpacker = new FlowFileUnpackagerV3(); - - // unpack first flowfile received - Map<String, String> receivedAttrs = unpacker.unpackageFlowFile(bais, baos); - byte[] contentReceived = baos.toByteArray(); - assertEquals("Hello", new String(contentReceived)); - assertEquals("cba", receivedAttrs.get("abc")); - - assertTrue(unpacker.hasMoreData()); - - baos.reset(); - receivedAttrs = unpacker.unpackageFlowFile(bais, baos); - contentReceived = baos.toByteArray(); - - assertEquals("World", new String(contentReceived)); - assertEquals("abc", receivedAttrs.get("abc")); - assertEquals("xyz.txt", receivedAttrs.get("filename")); - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.ssl.StandardSSLContextService; +import org.apache.nifi.util.FlowFileUnpackagerV3; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.eclipse.jetty.servlet.ServletHandler; +import org.junit.After; +import org.junit.Test; + +public class TestPostHTTP { + + private TestServer server; + private TestRunner runner; + private CaptureServlet servlet; + + private void setup(final Map<String, String> sslProperties) throws Exception { + // set up web service + ServletHandler handler = new ServletHandler(); + handler.addServletWithMapping(CaptureServlet.class, "/*"); + servlet = (CaptureServlet) handler.getServlets()[0].getServlet(); + + // create the service + server = new TestServer(sslProperties); + server.addHandler(handler); + server.startServer(); + + runner = TestRunners.newTestRunner(PostHTTP.class); + } + + @After + public void cleanup() throws Exception { + if (server != null) { + server.shutdownServer(); + server = null; + } + } + + @Test + public void testTruststoreSSLOnly() throws Exception { + final Map<String, String> sslProps = new HashMap<>(); + sslProps.put(TestServer.NEED_CLIENT_AUTH, "false"); + sslProps.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks"); + sslProps.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest"); + sslProps.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS"); + setup(sslProps); + + final SSLContextService sslContextService = new StandardSSLContextService(); + runner.addControllerService("ssl-context", sslContextService); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks"); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest"); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_TYPE, "JKS"); + runner.enableControllerService(sslContextService); + + runner.setProperty(PostHTTP.URL, server.getSecureUrl()); + runner.setProperty(PostHTTP.SSL_CONTEXT_SERVICE, "ssl-context"); + + runner.enqueue("Hello world".getBytes()); + runner.run(); + + runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS, 1); + } + + @Test + public void testTwoWaySSL() throws Exception { + final Map<String, String> sslProps = new HashMap<>(); + sslProps.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks"); + sslProps.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest"); + sslProps.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS"); + sslProps.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks"); + sslProps.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "localtest"); + sslProps.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS"); + sslProps.put(TestServer.NEED_CLIENT_AUTH, "true"); + setup(sslProps); + + final SSLContextService sslContextService = new StandardSSLContextService(); + runner.addControllerService("ssl-context", sslContextService); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks"); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest"); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_TYPE, "JKS"); + runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE, "src/test/resources/localhost-ks.jks"); + runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_PASSWORD, "localtest"); + runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_TYPE, "JKS"); + runner.enableControllerService(sslContextService); + + runner.setProperty(PostHTTP.URL, server.getSecureUrl()); + runner.setProperty(PostHTTP.SSL_CONTEXT_SERVICE, "ssl-context"); + + runner.enqueue("Hello world".getBytes()); + runner.run(); + + runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS, 1); + } + + @Test + public void testOneWaySSLWhenServerConfiguredForTwoWay() throws Exception { + final Map<String, String> sslProps = new HashMap<>(); + sslProps.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks"); + sslProps.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest"); + sslProps.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS"); + sslProps.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks"); + sslProps.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "localtest"); + sslProps.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS"); + sslProps.put(TestServer.NEED_CLIENT_AUTH, "true"); + setup(sslProps); + + final SSLContextService sslContextService = new StandardSSLContextService(); + runner.addControllerService("ssl-context", sslContextService); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks"); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest"); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_TYPE, "JKS"); + runner.enableControllerService(sslContextService); + + runner.setProperty(PostHTTP.URL, server.getSecureUrl()); + runner.setProperty(PostHTTP.SSL_CONTEXT_SERVICE, "ssl-context"); + + runner.enqueue("Hello world".getBytes()); + runner.run(); + + runner.assertAllFlowFilesTransferred(PostHTTP.REL_FAILURE, 1); + } + + @Test + public void testSendAsFlowFile() throws Exception { + setup(null); + runner.setProperty(PostHTTP.URL, server.getUrl()); + runner.setProperty(PostHTTP.SEND_AS_FLOWFILE, "true"); + + final Map<String, String> attrs = new HashMap<>(); + attrs.put("abc", "cba"); + + runner.enqueue("Hello".getBytes(), attrs); + attrs.put("abc", "abc"); + attrs.put("filename", "xyz.txt"); + runner.enqueue("World".getBytes(), attrs); + + runner.run(1); + runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS); + + final byte[] lastPost = servlet.getLastPost(); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final ByteArrayInputStream bais = new ByteArrayInputStream(lastPost); + + FlowFileUnpackagerV3 unpacker = new FlowFileUnpackagerV3(); + + // unpack first flowfile received + Map<String, String> receivedAttrs = unpacker.unpackageFlowFile(bais, baos); + byte[] contentReceived = baos.toByteArray(); + assertEquals("Hello", new String(contentReceived)); + assertEquals("cba", receivedAttrs.get("abc")); + + assertTrue(unpacker.hasMoreData()); + + baos.reset(); + receivedAttrs = unpacker.unpackageFlowFile(bais, baos); + contentReceived = baos.toByteArray(); + + assertEquals("World", new String(contentReceived)); + assertEquals("abc", receivedAttrs.get("abc")); + assertEquals("xyz.txt", receivedAttrs.get("filename")); + } + + @Test + public void testSendAsFlowFileSecure() throws Exception { + final Map<String, String> sslProps = new HashMap<>(); + sslProps.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks"); + sslProps.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest"); + sslProps.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS"); + sslProps.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks"); + sslProps.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "localtest"); + sslProps.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS"); + sslProps.put(TestServer.NEED_CLIENT_AUTH, "true"); + setup(sslProps); + + final SSLContextService sslContextService = new StandardSSLContextService(); + runner.addControllerService("ssl-context", sslContextService); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks"); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest"); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_TYPE, "JKS"); + runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE, "src/test/resources/localhost-ks.jks"); + runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_PASSWORD, "localtest"); + runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_TYPE, "JKS"); + runner.enableControllerService(sslContextService); + + runner.setProperty(PostHTTP.URL, server.getSecureUrl()); + runner.setProperty(PostHTTP.SEND_AS_FLOWFILE, "true"); + runner.setProperty(PostHTTP.SSL_CONTEXT_SERVICE, "ssl-context"); + + final Map<String, String> attrs = new HashMap<>(); + attrs.put("abc", "cba"); + + runner.enqueue("Hello".getBytes(), attrs); + attrs.put("abc", "abc"); + attrs.put("filename", "xyz.txt"); + runner.enqueue("World".getBytes(), attrs); + + runner.run(1); + runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS); + + final byte[] lastPost = servlet.getLastPost(); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final ByteArrayInputStream bais = new ByteArrayInputStream(lastPost); + + FlowFileUnpackagerV3 unpacker = new FlowFileUnpackagerV3(); + + // unpack first flowfile received + Map<String, String> receivedAttrs = unpacker.unpackageFlowFile(bais, baos); + byte[] contentReceived = baos.toByteArray(); + assertEquals("Hello", new String(contentReceived)); + assertEquals("cba", receivedAttrs.get("abc")); + + assertTrue(unpacker.hasMoreData()); + + baos.reset(); + receivedAttrs = unpacker.unpackageFlowFile(bais, baos); + contentReceived = baos.toByteArray(); + + assertEquals("World", new String(contentReceived)); + assertEquals("abc", receivedAttrs.get("abc")); + assertEquals("xyz.txt", receivedAttrs.get("filename")); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutEmail.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutEmail.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutEmail.java index af04cbc..727e8e9 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutEmail.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutEmail.java @@ -1,82 +1,82 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.standard; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import static org.junit.Assert.assertEquals; -import org.junit.Test; - -public class TestPutEmail { - - @Test - public void testHostNotFound() { - // verifies that files are routed to failure when the SMTP host doesn't exist - final TestRunner runner = TestRunners.newTestRunner(new PutEmail()); - runner.setProperty(PutEmail.SMTP_HOSTNAME, "host-doesnt-exist123"); - runner.setProperty(PutEmail.FROM, "[email protected]"); - runner.setProperty(PutEmail.TO, "[email protected]"); - runner.setProperty(PutEmail.MESSAGE, "Message Body"); - - final Map<String, String> attributes = new HashMap<>(); - runner.enqueue("Some Text".getBytes(), attributes); - - runner.run(); - - runner.assertQueueEmpty(); - runner.assertAllFlowFilesTransferred(PutEmail.REL_FAILURE); - } - - @Test - public void testEmailPropertyFormatters() { - // verifies that files are routed to failure when the SMTP host doesn't exist - final TestRunner runner = TestRunners.newTestRunner(new PutEmail()); - runner.setProperty(PutEmail.HEADER_XMAILER, "TestingNiFi"); - runner.setProperty(PutEmail.SMTP_HOSTNAME, "smtp-host"); - runner.setProperty(PutEmail.SMTP_SOCKET_FACTORY, "${dynamicSocketFactory}"); - runner.setProperty(PutEmail.HEADER_XMAILER, "TestingNiFi"); - runner.setProperty(PutEmail.FROM, "[email protected]"); - runner.setProperty(PutEmail.MESSAGE, "Message Body"); - runner.setProperty(PutEmail.TO, "[email protected]"); - - ProcessSession session = runner.getProcessSessionFactory().createSession(); - FlowFile ff = session.create(); - ff = session.putAttribute(ff, "dynamicSocketFactory", "testingSocketFactory"); - ProcessContext context = runner.getProcessContext(); - - String xmailer = context.getProperty(PutEmail.HEADER_XMAILER).evaluateAttributeExpressions(ff).getValue(); - assertEquals("X-Mailer Header", "TestingNiFi", xmailer); - - String socketFactory = context.getProperty(PutEmail.SMTP_SOCKET_FACTORY).evaluateAttributeExpressions(ff).getValue(); - assertEquals("Socket Factory", "testingSocketFactory", socketFactory); - - final Map<String, String> attributes = new HashMap<>(); - runner.enqueue("Some Text".getBytes(), attributes); - - runner.run(); - - runner.assertQueueEmpty(); - runner.assertAllFlowFilesTransferred(PutEmail.REL_FAILURE); - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import static org.junit.Assert.assertEquals; +import org.junit.Test; + +public class TestPutEmail { + + @Test + public void testHostNotFound() { + // verifies that files are routed to failure when the SMTP host doesn't exist + final TestRunner runner = TestRunners.newTestRunner(new PutEmail()); + runner.setProperty(PutEmail.SMTP_HOSTNAME, "host-doesnt-exist123"); + runner.setProperty(PutEmail.FROM, "[email protected]"); + runner.setProperty(PutEmail.TO, "[email protected]"); + runner.setProperty(PutEmail.MESSAGE, "Message Body"); + + final Map<String, String> attributes = new HashMap<>(); + runner.enqueue("Some Text".getBytes(), attributes); + + runner.run(); + + runner.assertQueueEmpty(); + runner.assertAllFlowFilesTransferred(PutEmail.REL_FAILURE); + } + + @Test + public void testEmailPropertyFormatters() { + // verifies that files are routed to failure when the SMTP host doesn't exist + final TestRunner runner = TestRunners.newTestRunner(new PutEmail()); + runner.setProperty(PutEmail.HEADER_XMAILER, "TestingNiFi"); + runner.setProperty(PutEmail.SMTP_HOSTNAME, "smtp-host"); + runner.setProperty(PutEmail.SMTP_SOCKET_FACTORY, "${dynamicSocketFactory}"); + runner.setProperty(PutEmail.HEADER_XMAILER, "TestingNiFi"); + runner.setProperty(PutEmail.FROM, "[email protected]"); + runner.setProperty(PutEmail.MESSAGE, "Message Body"); + runner.setProperty(PutEmail.TO, "[email protected]"); + + ProcessSession session = runner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + ff = session.putAttribute(ff, "dynamicSocketFactory", "testingSocketFactory"); + ProcessContext context = runner.getProcessContext(); + + String xmailer = context.getProperty(PutEmail.HEADER_XMAILER).evaluateAttributeExpressions(ff).getValue(); + assertEquals("X-Mailer Header", "TestingNiFi", xmailer); + + String socketFactory = context.getProperty(PutEmail.SMTP_SOCKET_FACTORY).evaluateAttributeExpressions(ff).getValue(); + assertEquals("Socket Factory", "testingSocketFactory", socketFactory); + + final Map<String, String> attributes = new HashMap<>(); + runner.enqueue("Some Text".getBytes(), attributes); + + runner.run(); + + runner.assertQueueEmpty(); + runner.assertAllFlowFilesTransferred(PutEmail.REL_FAILURE); + } + +}
