http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetJMSQueue.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetJMSQueue.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetJMSQueue.java index 9c833f5..b6c79d5 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetJMSQueue.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetJMSQueue.java @@ -36,12 +36,16 @@ public class TestGetJMSQueue { @org.junit.Ignore public void testSendTextToQueue() throws Exception { final TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class); - runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER); + runner. + setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER); runner.setProperty(JmsProperties.URL, "tcp://localhost:61616"); - runner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE); + runner. + setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE); runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing"); - runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO); - WrappedMessageProducer wrappedProducer = JmsFactory.createMessageProducer(runner.getProcessContext(), true); + runner. + setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO); + WrappedMessageProducer wrappedProducer = JmsFactory. + createMessageProducer(runner.getProcessContext(), true); final Session jmsSession = wrappedProducer.getSession(); final MessageProducer producer = wrappedProducer.getProducer(); @@ -56,12 +60,16 @@ public class TestGetJMSQueue { @org.junit.Ignore public void testSendBytesToQueue() throws Exception { final TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class); - runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER); + runner. + setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER); runner.setProperty(JmsProperties.URL, "tcp://localhost:61616"); - runner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE); + runner. + setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE); runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing"); - runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO); - WrappedMessageProducer wrappedProducer = JmsFactory.createMessageProducer(runner.getProcessContext(), true); + runner. + setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO); + WrappedMessageProducer wrappedProducer = JmsFactory. + createMessageProducer(runner.getProcessContext(), true); final Session jmsSession = wrappedProducer.getSession(); final MessageProducer producer = wrappedProducer.getProducer(); @@ -77,12 +85,16 @@ public class TestGetJMSQueue { @org.junit.Ignore public void testSendStreamToQueue() throws Exception { final TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class); - runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER); + runner. + setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER); runner.setProperty(JmsProperties.URL, "tcp://localhost:61616"); - runner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE); + runner. + setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE); runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing"); - runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO); - WrappedMessageProducer wrappedProducer = JmsFactory.createMessageProducer(runner.getProcessContext(), true); + runner. + setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO); + WrappedMessageProducer wrappedProducer = JmsFactory. + createMessageProducer(runner.getProcessContext(), true); final Session jmsSession = wrappedProducer.getSession(); final MessageProducer producer = wrappedProducer.getProducer(); @@ -98,12 +110,16 @@ public class TestGetJMSQueue { @org.junit.Ignore public void testSendMapToQueue() throws Exception { final TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class); - runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER); + runner. + setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER); runner.setProperty(JmsProperties.URL, "tcp://localhost:61616"); - runner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE); + runner. + setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE); runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing"); - runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO); - WrappedMessageProducer wrappedProducer = JmsFactory.createMessageProducer(runner.getProcessContext(), true); + runner. + setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO); + WrappedMessageProducer wrappedProducer = JmsFactory. + createMessageProducer(runner.getProcessContext(), true); final Session jmsSession = wrappedProducer.getSession(); final MessageProducer producer = wrappedProducer.getProducer(); @@ -120,17 +136,22 @@ public class TestGetJMSQueue { @org.junit.Ignore public void testSendObjectToQueue() throws Exception { final TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class); - runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER); + runner. + setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER); runner.setProperty(JmsProperties.URL, "tcp://localhost:61616"); - runner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE); + runner. + setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE); runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing"); - runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO); - WrappedMessageProducer wrappedProducer = JmsFactory.createMessageProducer(runner.getProcessContext(), true); + runner. + setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO); + WrappedMessageProducer wrappedProducer = JmsFactory. + createMessageProducer(runner.getProcessContext(), true); final Session jmsSession = wrappedProducer.getSession(); final MessageProducer producer = wrappedProducer.getProducer(); // Revision class is used because test just needs any Serializable class in core NiFi - final ObjectMessage message = jmsSession.createObjectMessage(new Revision(1L, "ID")); + final ObjectMessage message = jmsSession. + createObjectMessage(new Revision(1L, "ID")); producer.send(message); jmsSession.commit();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java index 363fc0f..8b9b2ac 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java @@ -44,14 +44,16 @@ public class TestHandleHttpRequest { @Test public void testRequestAddedToService() throws InitializationException, MalformedURLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class); + final TestRunner runner = TestRunners. + newTestRunner(HandleHttpRequest.class); runner.setProperty(HandleHttpRequest.PORT, "0"); - + final MockHttpContextMap contextMap = new MockHttpContextMap(); runner.addControllerService("http-context-map", contextMap); runner.enableControllerService(contextMap); - runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map"); - + runner. + setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map"); + // trigger processor to stop but not shutdown. runner.run(1, false); try { @@ -59,8 +61,10 @@ public class TestHandleHttpRequest { @Override public void run() { try { - final int port = ((HandleHttpRequest) runner.getProcessor()).getPort(); - final HttpURLConnection connection = (HttpURLConnection) new URL("http://localhost:" + port + "/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection(); + final int port = ((HandleHttpRequest) runner. + getProcessor()).getPort(); + final HttpURLConnection connection = (HttpURLConnection) new URL("http://localhost:" + port + "/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange"). + openConnection(); connection.setDoOutput(false); connection.setRequestMethod("GET"); connection.setRequestProperty("header1", "value1"); @@ -68,8 +72,9 @@ public class TestHandleHttpRequest { connection.setRequestProperty("header3", "apple=orange"); connection.setConnectTimeout(3000); connection.setReadTimeout(3000); - - StreamUtils.copy(connection.getInputStream(), new NullOutputStream()); + + StreamUtils. + copy(connection.getInputStream(), new NullOutputStream()); } catch (final Throwable t) { t.printStackTrace(); Assert.fail(t.toString()); @@ -77,17 +82,23 @@ public class TestHandleHttpRequest { } }); httpThread.start(); - + // give processor a bit to handle the http request - try { Thread.sleep(1000L); } catch (final InterruptedException ie) {} - + try { + Thread.sleep(1000L); + } catch (final InterruptedException ie) { + } + // process the request. runner.run(1, false); - - runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 1); + + runner. + assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 1); assertEquals(1, contextMap.size()); - - final MockFlowFile mff = runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).get(0); + + final MockFlowFile mff = runner. + getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS). + get(0); mff.assertAttributeEquals("http.query.param.query", "true"); mff.assertAttributeEquals("http.query.param.value1", "value1"); mff.assertAttributeEquals("http.query.param.value2", ""); @@ -100,11 +111,11 @@ public class TestHandleHttpRequest { runner.run(1, true); } } - - + private static class MockHttpContextMap extends AbstractControllerService implements HttpContextMap { + private final ConcurrentMap<String, HttpServletResponse> responseMap = new ConcurrentHashMap<>(); - + @Override public boolean register(String identifier, HttpServletRequest request, HttpServletResponse response, AsyncContext context) { responseMap.put(identifier, response); @@ -120,7 +131,7 @@ public class TestHandleHttpRequest { public void complete(String identifier) { responseMap.remove(identifier); } - + public int size() { return responseMap.size(); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java index 7b41809..40683ae 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java @@ -51,27 +51,29 @@ public class TestHandleHttpResponse { @Test public void testEnsureCompleted() throws InitializationException { - final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class); - + final TestRunner runner = TestRunners. + newTestRunner(HandleHttpResponse.class); + final MockHttpContextMap contextMap = new MockHttpContextMap("my-id"); runner.addControllerService("http-context-map", contextMap); runner.enableControllerService(contextMap); - runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, "http-context-map"); + runner. + setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, "http-context-map"); runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}"); runner.setProperty("my-attr", "${my-attr}"); runner.setProperty("no-valid-attr", "${no-valid-attr}"); - + final Map<String, String> attributes = new HashMap<>(); attributes.put(HandleHttpResponse.HTTP_CONTEXT_ID, "my-id"); attributes.put("my-attr", "hello"); attributes.put("status.code", "201"); - + runner.enqueue("hello".getBytes(), attributes); - + runner.run(); runner.assertAllFlowFilesTransferred(HandleHttpResponse.REL_SUCCESS, 1); - + assertEquals("hello", contextMap.baos.toString()); assertEquals("hello", contextMap.headersSent.get("my-attr")); assertNull(contextMap.headersSent.get("no-valid-attr")); @@ -79,21 +81,21 @@ public class TestHandleHttpResponse { assertEquals(1, contextMap.getCompletionCount()); assertTrue(contextMap.headersWithNoValue.isEmpty()); } - - + private static class MockHttpContextMap extends AbstractControllerService implements HttpContextMap { + private final String id; private final AtomicInteger completedCount = new AtomicInteger(0); private final ByteArrayOutputStream baos = new ByteArrayOutputStream(); private final ConcurrentMap<String, String> headersSent = new ConcurrentHashMap<>(); private volatile int statusCode = -1; - + private final List<String> headersWithNoValue = new CopyOnWriteArrayList<>(); - + public MockHttpContextMap(final String expectedIdentifier) { this.id = expectedIdentifier; } - + @Override public boolean register(String identifier, HttpServletRequest request, HttpServletResponse response, AsyncContext context) { return true; @@ -101,53 +103,71 @@ public class TestHandleHttpResponse { @Override public HttpServletResponse getResponse(final String identifier) { - if ( !id.equals(identifier) ) { - Assert.fail("attempting to respond to wrong request; should have been " + id + " but was " + identifier); + if (!id.equals(identifier)) { + Assert. + fail("attempting to respond to wrong request; should have been " + id + " but was " + identifier); } - + try { - final HttpServletResponse response = Mockito.mock(HttpServletResponse.class); - Mockito.when(response.getOutputStream()).thenReturn(new ServletOutputStream() { - @Override - public boolean isReady() { return true; } + final HttpServletResponse response = Mockito. + mock(HttpServletResponse.class); + Mockito.when(response.getOutputStream()). + thenReturn(new ServletOutputStream() { + @Override + public boolean isReady() { + return true; + } - @Override - public void setWriteListener(WriteListener writeListener) {} + @Override + public void setWriteListener(WriteListener writeListener) { + } + + @Override + public void write(int b) throws IOException { + baos.write(b); + } + + @Override + public void write(byte[] b) throws IOException { + baos.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + baos.write(b, off, len); + } + }); - @Override - public void write(int b) throws IOException { baos.write(b); } - - @Override - public void write(byte[] b) throws IOException { baos.write(b); } - - @Override - public void write(byte[] b, int off, int len) throws IOException { baos.write(b, off, len); } - }); - - Mockito.doAnswer(new Answer<Object>() { @Override public Object answer(final InvocationOnMock invocation) throws Throwable { - final String key = invocation.getArgumentAt(0, String.class); - final String value = invocation.getArgumentAt(1, String.class); - if ( value == null ) { + final String key = invocation. + getArgumentAt(0, String.class); + final String value = invocation. + getArgumentAt(1, String.class); + if (value == null) { headersWithNoValue.add(key); } else { headersSent.put(key, value); } - + return null; } - }).when(response).setHeader(Mockito.any(String.class), Mockito.any(String.class)); - + }). + when(response). + setHeader(Mockito.any(String.class), Mockito. + any(String.class)); + Mockito.doAnswer(new Answer<Object>() { @Override public Object answer(final InvocationOnMock invocation) throws Throwable { statusCode = invocation.getArgumentAt(0, int.class); return null; } - }).when(response).setStatus(Mockito.anyInt()); - + }). + when(response). + setStatus(Mockito.anyInt()); + return response; } catch (final Exception e) { e.printStackTrace(); @@ -158,13 +178,14 @@ public class TestHandleHttpResponse { @Override public void complete(final String identifier) { - if ( !id.equals(identifier) ) { - Assert.fail("attempting to respond to wrong request; should have been " + id + " but was " + identifier); + if (!id.equals(identifier)) { + Assert. + fail("attempting to respond to wrong request; should have been " + id + " but was " + identifier); } - + completedCount.incrementAndGet(); } - + public int getCompletionCount() { return completedCount.get(); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHashAttribute.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHashAttribute.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHashAttribute.java index 778a78c..a57f6cf 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHashAttribute.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHashAttribute.java @@ -36,7 +36,8 @@ public class TestHashAttribute { @Test public void test() { final TestRunner runner = TestRunners.newTestRunner(new HashAttribute()); - runner.setProperty(HashAttribute.HASH_VALUE_ATTRIBUTE.getName(), "hashValue"); + runner. + setProperty(HashAttribute.HASH_VALUE_ATTRIBUTE.getName(), "hashValue"); runner.setProperty("MDKey1", ".*"); runner.setProperty("MDKey2", "(.).*"); @@ -66,7 +67,8 @@ public class TestHashAttribute { runner.assertTransferCount(HashAttribute.REL_FAILURE, 1); runner.assertTransferCount(HashAttribute.REL_SUCCESS, 4); - final List<MockFlowFile> success = runner.getFlowFilesForRelationship(HashAttribute.REL_SUCCESS); + final List<MockFlowFile> success = runner. + getFlowFilesForRelationship(HashAttribute.REL_SUCCESS); final Map<String, Integer> correlationCount = new HashMap<String, Integer>(); for (final MockFlowFile flowFile : success) { final String correlationId = flowFile.getAttribute("hashValue"); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHashContent.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHashContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHashContent.java index db3bcad..8f6f5f4 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHashContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHashContent.java @@ -59,7 +59,9 @@ public class TestHashContent { runner.assertQueueEmpty(); runner.assertAllFlowFilesTransferred(HashContent.REL_SUCCESS, 1); - final MockFlowFile outFile = runner.getFlowFilesForRelationship(HashContent.REL_SUCCESS).get(0); + final MockFlowFile outFile = runner. + getFlowFilesForRelationship(HashContent.REL_SUCCESS). + get(0); final String hashValue = outFile.getAttribute("hash"); assertEquals(expectedHash, hashValue); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestIdentifyMimeType.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestIdentifyMimeType.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestIdentifyMimeType.java index 1bf4585..9f49476 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestIdentifyMimeType.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestIdentifyMimeType.java @@ -36,7 +36,8 @@ public class TestIdentifyMimeType { @Test public void testFiles() throws IOException { - final TestRunner runner = TestRunners.newTestRunner(new IdentifyMimeType()); + final TestRunner runner = TestRunners. + newTestRunner(new IdentifyMimeType()); final File dir = new File("src/test/resources/TestIdentifyMimeType"); final File[] files = dir.listFiles(); @@ -53,7 +54,8 @@ public class TestIdentifyMimeType { runner.setThreadCount(1); runner.run(fileCount); - runner.assertAllFlowFilesTransferred(IdentifyMimeType.REL_SUCCESS, fileCount); + runner. + assertAllFlowFilesTransferred(IdentifyMimeType.REL_SUCCESS, fileCount); final Map<String, String> expectedMimeTypes = new HashMap<>(); expectedMimeTypes.put("1.7z", "application/x-7z-compressed"); @@ -91,10 +93,13 @@ public class TestIdentifyMimeType { expectedExtensions.put("flowfilev3", ""); expectedExtensions.put("flowfilev1.tar", ""); - final List<MockFlowFile> filesOut = runner.getFlowFilesForRelationship(IdentifyMimeType.REL_SUCCESS); + final List<MockFlowFile> filesOut = runner. + getFlowFilesForRelationship(IdentifyMimeType.REL_SUCCESS); for (final MockFlowFile file : filesOut) { - final String filename = file.getAttribute(CoreAttributes.FILENAME.key()); - final String mimeType = file.getAttribute(CoreAttributes.MIME_TYPE.key()); + final String filename = file.getAttribute(CoreAttributes.FILENAME. + key()); + final String mimeType = file.getAttribute(CoreAttributes.MIME_TYPE. + key()); final String expected = expectedMimeTypes.get(filename); final String extension = file.getAttribute("mime.extension"); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java index e5950cd..03fd14b 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java @@ -16,9 +16,6 @@ */ package org.apache.nifi.processors.standard; - -import static org.junit.Assert.*; - import java.io.IOException; import java.io.PrintWriter; import java.io.UnsupportedEncodingException; @@ -45,6 +42,8 @@ import org.eclipse.jetty.server.handler.AbstractHandler; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -61,7 +60,7 @@ public class TestInvokeHTTP { public static void beforeClass() throws Exception { // useful for verbose logging output // don't commit this with this property enabled, or any 'mvn test' will be really verbose - // System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard", "debug"); + // System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard", "debug"); // create the SSL properties, which basically store keystore / trustore information // this is used by the StandardSSLContextService and the Jetty Server @@ -146,7 +145,9 @@ public class TestInvokeHTTP { //expected in request status.code and status.message //original flow file (+attributes)?????????? - final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_REQ).get(0); + final MockFlowFile bundle = runner. + getFlowFilesForRelationship(Config.REL_SUCCESS_REQ). + get(0); bundle.assertAttributeEquals(Config.STATUS_CODE, "200"); bundle.assertAttributeEquals(Config.STATUS_MESSAGE, "OK"); final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); @@ -158,12 +159,15 @@ public class TestInvokeHTTP { //status code, status message, all headers from server response --> ff attributes //server response message body into payload of ff //should not contain any original ff attributes - final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_RESP).get(0); + final MockFlowFile bundle1 = runner. + getFlowFilesForRelationship(Config.REL_SUCCESS_RESP). + get(0); bundle1.assertContentEquals("/status/200".getBytes("UTF-8")); bundle1.assertAttributeEquals(Config.STATUS_CODE, "200"); bundle1.assertAttributeEquals(Config.STATUS_MESSAGE, "OK"); bundle1.assertAttributeEquals("Foo", "Bar"); - bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1"); + bundle1. + assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1"); final String actual1 = new String(bundle1.toByteArray(), StandardCharsets.UTF_8); final String expected1 = "/status/200"; Assert.assertEquals(expected1, actual1); @@ -186,7 +190,9 @@ public class TestInvokeHTTP { runner.assertTransferCount(Config.REL_FAILURE, 0); //expected in response - final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_RETRY).get(0); + final MockFlowFile bundle = runner. + getFlowFilesForRelationship(Config.REL_RETRY). + get(0); final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); bundle.assertAttributeEquals(Config.STATUS_CODE, "500"); bundle.assertAttributeEquals(Config.STATUS_MESSAGE, "Server Error"); @@ -214,7 +220,9 @@ public class TestInvokeHTTP { runner.assertTransferCount(Config.REL_FAILURE, 0); //getMyFlowFiles(); //expected in response - final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_NO_RETRY).get(0); + final MockFlowFile bundle = runner. + getFlowFilesForRelationship(Config.REL_NO_RETRY). + get(0); final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); bundle.assertAttributeEquals(Config.STATUS_CODE, "302"); @@ -241,7 +249,9 @@ public class TestInvokeHTTP { runner.assertTransferCount(Config.REL_FAILURE, 0); //getMyFlowFiles(); //expected in response - final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_NO_RETRY).get(0); + final MockFlowFile bundle = runner. + getFlowFilesForRelationship(Config.REL_NO_RETRY). + get(0); final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); bundle.assertAttributeEquals(Config.STATUS_CODE, "304"); @@ -268,7 +278,9 @@ public class TestInvokeHTTP { runner.assertTransferCount(Config.REL_FAILURE, 0); //getMyFlowFiles(); //expected in response - final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_NO_RETRY).get(0); + final MockFlowFile bundle = runner. + getFlowFilesForRelationship(Config.REL_NO_RETRY). + get(0); final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); bundle.assertAttributeEquals(Config.STATUS_CODE, "400"); @@ -297,11 +309,14 @@ public class TestInvokeHTTP { runner.assertTransferCount(Config.REL_FAILURE, 0); //expected in response - final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_NO_RETRY).get(0); + final MockFlowFile bundle = runner. + getFlowFilesForRelationship(Config.REL_NO_RETRY). + get(0); final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); bundle.assertAttributeEquals(Config.STATUS_CODE, "412"); - bundle.assertAttributeEquals(Config.STATUS_MESSAGE, "Precondition Failed"); + bundle. + assertAttributeEquals(Config.STATUS_MESSAGE, "Precondition Failed"); bundle.assertAttributeEquals(Config.RESPONSE_BODY, "/status/412"); final String expected = "Hello"; Assert.assertEquals(expected, actual); @@ -325,7 +340,9 @@ public class TestInvokeHTTP { runner.assertTransferCount(Config.REL_NO_RETRY, 0); runner.assertTransferCount(Config.REL_FAILURE, 0); - final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_REQ).get(0); + final MockFlowFile bundle = runner. + getFlowFilesForRelationship(Config.REL_SUCCESS_REQ). + get(0); bundle.assertAttributeEquals(Config.STATUS_CODE, "200"); bundle.assertAttributeEquals(Config.STATUS_MESSAGE, "OK"); final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); @@ -333,7 +350,9 @@ public class TestInvokeHTTP { Assert.assertEquals(expected, actual); bundle.assertAttributeEquals("Foo", "Bar"); - final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_RESP).get(0); + final MockFlowFile bundle1 = runner. + getFlowFilesForRelationship(Config.REL_SUCCESS_RESP). + get(0); bundle1.assertContentEquals("".getBytes("UTF-8")); bundle1.assertAttributeEquals(Config.STATUS_CODE, "200"); bundle1.assertAttributeEquals(Config.STATUS_MESSAGE, "OK"); @@ -360,7 +379,9 @@ public class TestInvokeHTTP { runner.assertTransferCount(Config.REL_NO_RETRY, 0); runner.assertTransferCount(Config.REL_FAILURE, 0); - final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_REQ).get(0); + final MockFlowFile bundle = runner. + getFlowFilesForRelationship(Config.REL_SUCCESS_REQ). + get(0); bundle.assertAttributeEquals(Config.STATUS_CODE, "200"); bundle.assertAttributeEquals(Config.STATUS_MESSAGE, "OK"); final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); @@ -368,7 +389,9 @@ public class TestInvokeHTTP { Assert.assertEquals(expected, actual); bundle.assertAttributeEquals("Foo", "Bar"); - final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_RESP).get(0); + final MockFlowFile bundle1 = runner. + getFlowFilesForRelationship(Config.REL_SUCCESS_RESP). + get(0); bundle1.assertContentEquals("".getBytes("UTF-8")); bundle1.assertAttributeEquals(Config.STATUS_CODE, "200"); bundle1.assertAttributeEquals(Config.STATUS_MESSAGE, "OK"); @@ -396,7 +419,9 @@ public class TestInvokeHTTP { runner.assertTransferCount(Config.REL_NO_RETRY, 0); runner.assertTransferCount(Config.REL_FAILURE, 0); - final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_REQ).get(0); + final MockFlowFile bundle = runner. + getFlowFilesForRelationship(Config.REL_SUCCESS_REQ). + get(0); bundle.assertAttributeEquals(Config.STATUS_CODE, "200"); bundle.assertAttributeEquals(Config.STATUS_MESSAGE, "OK"); final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); @@ -404,7 +429,9 @@ public class TestInvokeHTTP { Assert.assertEquals(expected, actual); bundle.assertAttributeEquals("Foo", "Bar"); - final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_RESP).get(0); + final MockFlowFile bundle1 = runner. + getFlowFilesForRelationship(Config.REL_SUCCESS_RESP). + get(0); bundle1.assertContentEquals("".getBytes("UTF-8")); bundle1.assertAttributeEquals(Config.STATUS_CODE, "200"); bundle1.assertAttributeEquals(Config.STATUS_MESSAGE, "OK"); @@ -433,7 +460,9 @@ public class TestInvokeHTTP { runner.assertTransferCount(Config.REL_NO_RETRY, 0); runner.assertTransferCount(Config.REL_FAILURE, 1); - final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_FAILURE).get(0); + final MockFlowFile bundle = runner. + getFlowFilesForRelationship(Config.REL_FAILURE). + get(0); final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); final String expected = "Hello"; @@ -505,7 +534,9 @@ public class TestInvokeHTTP { runner.assertTransferCount(Config.REL_NO_RETRY, 0); runner.assertTransferCount(Config.REL_FAILURE, 1); - final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_FAILURE).get(0); + final MockFlowFile bundle = runner. + getFlowFilesForRelationship(Config.REL_FAILURE). + get(0); final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); final String expected = "Hello"; @@ -515,11 +546,15 @@ public class TestInvokeHTTP { private static Map<String, String> createSslProperties() { Map<String, String> map = new HashMap<String, String>(); - map.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks"); - map.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest"); + map. + put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks"); + map. + put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest"); map.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS"); - map.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks"); - map.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "localtest"); + map. + put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks"); + map. + put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "localtest"); map.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS"); return map; } @@ -547,7 +582,8 @@ public class TestInvokeHTTP { assertEquals("/post", target); - String body = request.getReader().readLine(); + String body = request.getReader(). + readLine(); assertEquals("Hello", body); } @@ -559,7 +595,8 @@ public class TestInvokeHTTP { public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { baseRequest.setHandled(true); - int status = Integer.valueOf(target.substring("/status".length() + 1)); + int status = Integer.valueOf(target. + substring("/status".length() + 1)); response.setStatus(status); response.setContentType("text/plain"); @@ -588,7 +625,8 @@ public class TestInvokeHTTP { response.setStatus(200); response.setContentType("text/plain"); - response.getWriter().println("Way to go!"); + response.getWriter(). + println("Way to go!"); } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJmsConsumer.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJmsConsumer.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJmsConsumer.java index 1777a89..8511b50 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJmsConsumer.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJmsConsumer.java @@ -16,8 +16,6 @@ */ package org.apache.nifi.processors.standard; -import static org.junit.Assert.*; - import java.io.IOException; import java.io.InputStream; import java.util.Map; @@ -40,88 +38,110 @@ import org.apache.nifi.util.MockProcessContext; import org.apache.nifi.util.MockProcessorInitializationContext; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import static org.junit.Assert.assertEquals; import org.junit.Test; /** - * + * */ public class TestJmsConsumer { - static protected MapMessage createMapMessage() throws JMSException { - MapMessage mapMessage = new ActiveMQMapMessage(); - mapMessage.setString("name", "Arnold"); - mapMessage.setInt ("age", 97); - mapMessage.setDouble("xyz", 89686.564); - mapMessage.setBoolean("good", true); - return mapMessage; - } - - /** - * Test method for {@link org.apache.nifi.processors.standard.JmsConsumer#createMapMessageAttrs(javax.jms.MapMessage)}. - * @throws JMSException - */ - @Test - public void testCreateMapMessageValues() throws JMSException { - - MapMessage mapMessage = createMapMessage(); - - Map<String, String> mapMessageValues = JmsConsumer.createMapMessageValues(mapMessage); - assertEquals("", 4, mapMessageValues.size()); - assertEquals("", "Arnold", mapMessageValues.get(JmsConsumer.MAP_MESSAGE_PREFIX+"name")); - assertEquals("", "97", mapMessageValues.get(JmsConsumer.MAP_MESSAGE_PREFIX+"age")); - assertEquals("", "89686.564", mapMessageValues.get(JmsConsumer.MAP_MESSAGE_PREFIX+"xyz")); - assertEquals("", "true", mapMessageValues.get(JmsConsumer.MAP_MESSAGE_PREFIX+"good")); - } - - /** - * Test MapMessage to FlowFile conversion - */ - @Test - public void testMap2FlowFileMapMessage() throws Exception { - - TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class); - MapMessage mapMessage = createMapMessage(); - - ProcessContext context = runner.getProcessContext(); - ProcessSession session = runner.getProcessSessionFactory().createSession(); - ProcessorInitializationContext pic = new MockProcessorInitializationContext(runner.getProcessor(), + static protected MapMessage createMapMessage() throws JMSException { + MapMessage mapMessage = new ActiveMQMapMessage(); + mapMessage.setString("name", "Arnold"); + mapMessage.setInt("age", 97); + mapMessage.setDouble("xyz", 89686.564); + mapMessage.setBoolean("good", true); + return mapMessage; + } + + /** + * Test method for + * {@link org.apache.nifi.processors.standard.JmsConsumer#createMapMessageAttrs(javax.jms.MapMessage)}. + * + * @throws JMSException jms + */ + @Test + public void testCreateMapMessageValues() throws JMSException { + + MapMessage mapMessage = createMapMessage(); + + Map<String, String> mapMessageValues = JmsConsumer. + createMapMessageValues(mapMessage); + assertEquals("", 4, mapMessageValues.size()); + assertEquals("", "Arnold", mapMessageValues. + get(JmsConsumer.MAP_MESSAGE_PREFIX + "name")); + assertEquals("", "97", mapMessageValues. + get(JmsConsumer.MAP_MESSAGE_PREFIX + "age")); + assertEquals("", "89686.564", mapMessageValues. + get(JmsConsumer.MAP_MESSAGE_PREFIX + "xyz")); + assertEquals("", "true", mapMessageValues. + get(JmsConsumer.MAP_MESSAGE_PREFIX + "good")); + } + + /** + * Test MapMessage to FlowFile conversion + * + * @throws java.lang.Exception ex + */ + @Test + public void testMap2FlowFileMapMessage() throws Exception { + + TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class); + MapMessage mapMessage = createMapMessage(); + + ProcessContext context = runner.getProcessContext(); + ProcessSession session = runner.getProcessSessionFactory(). + createSession(); + ProcessorInitializationContext pic = new MockProcessorInitializationContext(runner. + getProcessor(), (MockProcessContext) runner.getProcessContext()); - - JmsProcessingSummary summary = JmsConsumer.map2FlowFile(context, session, mapMessage, true, pic.getLogger()); - - assertEquals("MapMessage should not create FlowFile content", 0, summary.getBytesReceived()); - - Map<String, String> attributes = summary.getLastFlowFile().getAttributes(); - assertEquals("", "Arnold", attributes.get(JmsConsumer.MAP_MESSAGE_PREFIX+"name")); - assertEquals("", "97", attributes.get(JmsConsumer.MAP_MESSAGE_PREFIX+"age")); - assertEquals("", "89686.564", attributes.get(JmsConsumer.MAP_MESSAGE_PREFIX+"xyz")); - assertEquals("", "true", attributes.get(JmsConsumer.MAP_MESSAGE_PREFIX+"good")); - } - - /** - * Test TextMessage to FlowFile conversion - */ - @Test - public void testMap2FlowFileTextMessage() throws Exception { - - TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class); - TextMessage textMessage = new ActiveMQTextMessage(); - - String payload = "Hello world!"; - textMessage.setText(payload); - - ProcessContext context = runner.getProcessContext(); - ProcessSession session = runner.getProcessSessionFactory().createSession(); - ProcessorInitializationContext pic = new MockProcessorInitializationContext(runner.getProcessor(), + + JmsProcessingSummary summary = JmsConsumer. + map2FlowFile(context, session, mapMessage, true, pic.getLogger()); + + assertEquals("MapMessage should not create FlowFile content", 0, summary. + getBytesReceived()); + + Map<String, String> attributes = summary.getLastFlowFile(). + getAttributes(); + assertEquals("", "Arnold", attributes. + get(JmsConsumer.MAP_MESSAGE_PREFIX + "name")); + assertEquals("", "97", attributes. + get(JmsConsumer.MAP_MESSAGE_PREFIX + "age")); + assertEquals("", "89686.564", attributes. + get(JmsConsumer.MAP_MESSAGE_PREFIX + "xyz")); + assertEquals("", "true", attributes. + get(JmsConsumer.MAP_MESSAGE_PREFIX + "good")); + } + + @Test + public void testMap2FlowFileTextMessage() throws Exception { + + TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class); + TextMessage textMessage = new ActiveMQTextMessage(); + + String payload = "Hello world!"; + textMessage.setText(payload); + + ProcessContext context = runner.getProcessContext(); + ProcessSession session = runner.getProcessSessionFactory(). + createSession(); + ProcessorInitializationContext pic = new MockProcessorInitializationContext(runner. + getProcessor(), (MockProcessContext) runner.getProcessContext()); - - JmsProcessingSummary summary = JmsConsumer.map2FlowFile(context, session, textMessage, true, pic.getLogger()); - - assertEquals("TextMessage content length should equal to FlowFile content size", payload.length(), summary.getLastFlowFile().getSize()); + + JmsProcessingSummary summary = JmsConsumer. + map2FlowFile(context, session, textMessage, true, pic. + getLogger()); + + assertEquals("TextMessage content length should equal to FlowFile content size", payload. + length(), summary.getLastFlowFile(). + getSize()); final byte[] buffer = new byte[payload.length()]; runner.clearTransferState(); - + session.read(summary.getLastFlowFile(), new InputStreamCallback() { @Override public void process(InputStream in) throws IOException { @@ -129,36 +149,42 @@ public class TestJmsConsumer { } }); - String contentString = new String(buffer,"UTF-8"); + String contentString = new String(buffer, "UTF-8"); assertEquals("", payload, contentString); - } - - /** - * Test BytesMessage to FlowFile conversion - */ - @Test - public void testMap2FlowFileBytesMessage() throws Exception { - - TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class); - BytesMessage bytesMessage = new ActiveMQBytesMessage(); - - String sourceString = "Apache NiFi is an easy to use, powerful, and reliable system to process and distribute data.!"; - byte[] payload = sourceString.getBytes("UTF-8"); - bytesMessage.writeBytes(payload); - bytesMessage.reset(); - - ProcessContext context = runner.getProcessContext(); - ProcessSession session = runner.getProcessSessionFactory().createSession(); - ProcessorInitializationContext pic = new MockProcessorInitializationContext(runner.getProcessor(), + } + + /** + * Test BytesMessage to FlowFile conversion + */ + @Test + public void testMap2FlowFileBytesMessage() throws Exception { + + TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class); + BytesMessage bytesMessage = new ActiveMQBytesMessage(); + + String sourceString = "Apache NiFi is an easy to use, powerful, and reliable system to process and distribute data.!"; + byte[] payload = sourceString.getBytes("UTF-8"); + bytesMessage.writeBytes(payload); + bytesMessage.reset(); + + ProcessContext context = runner.getProcessContext(); + ProcessSession session = runner.getProcessSessionFactory(). + createSession(); + ProcessorInitializationContext pic = new MockProcessorInitializationContext(runner. + getProcessor(), (MockProcessContext) runner.getProcessContext()); - - JmsProcessingSummary summary = JmsConsumer.map2FlowFile(context, session, bytesMessage, true, pic.getLogger()); - assertEquals("BytesMessage content length should equal to FlowFile content size", payload.length, summary.getLastFlowFile().getSize()); + JmsProcessingSummary summary = JmsConsumer. + map2FlowFile(context, session, bytesMessage, true, pic. + getLogger()); + + assertEquals("BytesMessage content length should equal to FlowFile content size", payload.length, summary. + getLastFlowFile(). + getSize()); final byte[] buffer = new byte[payload.length]; runner.clearTransferState(); - + session.read(summary.getLastFlowFile(), new InputStreamCallback() { @Override public void process(InputStream in) throws IOException { @@ -166,8 +192,8 @@ public class TestJmsConsumer { } }); - String contentString = new String(buffer,"UTF-8"); + String contentString = new String(buffer, "UTF-8"); assertEquals("", sourceString, contentString); - } + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java index 0ee3539..d4d5524 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java @@ -51,8 +51,10 @@ public class TestListenUDP { System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); System.setProperty("org.slf4j.simpleLogger.showDateTime", "true"); System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug"); - System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.ListenUDP", "debug"); - System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestListenUDP", "debug"); + System. + setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.ListenUDP", "debug"); + System. + setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestListenUDP", "debug"); LOGGER = LoggerFactory.getLogger(TestListenUDP.class); } @@ -86,7 +88,8 @@ public class TestListenUDP { ProcessContext context = runner.getProcessContext(); ListenUDP processor = (ListenUDP) runner.getProcessor(); - ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); + ProcessSessionFactory processSessionFactory = runner. + getProcessSessionFactory(); processor.initializeChannelListenerAndConsumerProcessing(context); udpSender.start(); boolean transferred = false; @@ -94,14 +97,18 @@ public class TestListenUDP { while (!transferred && System.currentTimeMillis() < timeOut) { Thread.sleep(200); processor.onTrigger(context, processSessionFactory); - transferred = runner.getFlowFilesForRelationship(ListenUDP.RELATIONSHIP_SUCCESS).size() > 0; + transferred = runner. + getFlowFilesForRelationship(ListenUDP.RELATIONSHIP_SUCCESS). + size() > 0; } assertTrue("Didn't process the datagrams", transferred); Thread.sleep(7000); processor.stopping(); processor.stopped(); socket.close(); - assertTrue(runner.getFlowFilesForRelationship(ListenUDP.RELATIONSHIP_SUCCESS).size() >= 60); + assertTrue(runner. + getFlowFilesForRelationship(ListenUDP.RELATIONSHIP_SUCCESS). + size() >= 60); } @Test @@ -122,7 +129,8 @@ public class TestListenUDP { ProcessContext context = runner.getProcessContext(); ListenUDP processor = (ListenUDP) runner.getProcessor(); - ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); + ProcessSessionFactory processSessionFactory = runner. + getProcessSessionFactory(); processor.initializeChannelListenerAndConsumerProcessing(context); udpSender.start(); boolean transferred = false; @@ -130,14 +138,18 @@ public class TestListenUDP { while (!transferred && System.currentTimeMillis() < timeOut) { Thread.sleep(1000); processor.onTrigger(context, processSessionFactory); - transferred = runner.getFlowFilesForRelationship(ListenUDP.RELATIONSHIP_SUCCESS).size() > 0; + transferred = runner. + getFlowFilesForRelationship(ListenUDP.RELATIONSHIP_SUCCESS). + size() > 0; } assertTrue("Didn't process the datagrams", transferred); Thread.sleep(7000); processor.stopping(); processor.stopped(); socket.close(); - assertTrue(runner.getFlowFilesForRelationship(ListenUDP.RELATIONSHIP_SUCCESS).size() >= 2); + assertTrue(runner. + getFlowFilesForRelationship(ListenUDP.RELATIONSHIP_SUCCESS). + size() >= 2); } @Test @@ -154,7 +166,8 @@ public class TestListenUDP { ProcessContext context = runner.getProcessContext(); ListenUDP processor = (ListenUDP) runner.getProcessor(); - ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); + ProcessSessionFactory processSessionFactory = runner. + getProcessSessionFactory(); processor.initializeChannelListenerAndConsumerProcessing(context); udpSender.start(); int numTransfered = 0; @@ -162,7 +175,9 @@ public class TestListenUDP { while (numTransfered <= 80 && System.currentTimeMillis() < timeout) { Thread.sleep(200); processor.onTrigger(context, processSessionFactory); - numTransfered = runner.getFlowFilesForRelationship(ListenUDP.RELATIONSHIP_SUCCESS).size(); + numTransfered = runner. + getFlowFilesForRelationship(ListenUDP.RELATIONSHIP_SUCCESS). + size(); } assertFalse("Did not process all the datagrams", numTransfered < 80); processor.stopping(); @@ -200,7 +215,8 @@ public class TestListenUDP { } final long endTime = System.nanoTime(); final long durationMillis = (endTime - startTime) / 1000000; - LOGGER.info("Sent all UDP packets without any obvious errors | duration ms= " + durationMillis); + LOGGER. + info("Sent all UDP packets without any obvious errors | duration ms= " + durationMillis); } catch (IOException e) { LOGGER.error("", e); } finally { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java index d19e4ea..48ed8c6 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java @@ -47,14 +47,16 @@ public class TestMergeContent { @BeforeClass public static void setup() { - System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.standard", "DEBUG"); + System. + setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.standard", "DEBUG"); } - + @Test public void testSimpleBinaryConcat() throws IOException, InterruptedException { final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec"); - runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT); + runner. + setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT); createFlowFiles(runner); runner.run(); @@ -64,16 +66,20 @@ public class TestMergeContent { runner.assertTransferCount(MergeContent.REL_FAILURE, 0); runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3); - final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0); + final MockFlowFile bundle = runner. + getFlowFilesForRelationship(MergeContent.REL_MERGED). + get(0); bundle.assertContentEquals("Hello, World!".getBytes("UTF-8")); - bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/plain-text"); + bundle. + assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/plain-text"); } @Test public void testMimeTypeIsOctetStreamIfConflictingWithBinaryConcat() throws IOException, InterruptedException { final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec"); - runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT); + runner. + setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT); createFlowFiles(runner); @@ -87,9 +93,12 @@ public class TestMergeContent { runner.assertTransferCount(MergeContent.REL_FAILURE, 0); runner.assertTransferCount(MergeContent.REL_ORIGINAL, 4); - final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0); + final MockFlowFile bundle = runner. + getFlowFilesForRelationship(MergeContent.REL_MERGED). + get(0); bundle.assertContentEquals("Hello, World!".getBytes("UTF-8")); - bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/octet-stream"); + bundle. + assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/octet-stream"); } @Test @@ -99,8 +108,10 @@ public class TestMergeContent { runner.setProperty(MergeContent.MAX_BIN_COUNT, "50"); runner.setProperty(MergeContent.MIN_ENTRIES, "10"); runner.setProperty(MergeContent.MAX_ENTRIES, "10"); - runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT); - runner.setProperty(MergeContent.CORRELATION_ATTRIBUTE_NAME, "correlationId"); + runner. + setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT); + runner. + setProperty(MergeContent.CORRELATION_ATTRIBUTE_NAME, "correlationId"); final Map<String, String> attrs = new HashMap<>(); for (int i = 0; i < 49; i++) { @@ -132,7 +143,8 @@ public class TestMergeContent { @Test public void testSimpleBinaryConcatWaitsForMin() throws IOException, InterruptedException { final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); - runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT); + runner. + setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT); runner.setProperty(MergeContent.MIN_SIZE, "20 KB"); createFlowFiles(runner); @@ -147,7 +159,8 @@ public class TestMergeContent { public void testZip() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec"); - runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_ZIP); + runner. + setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_ZIP); createFlowFiles(runner); runner.run(); @@ -157,8 +170,11 @@ public class TestMergeContent { runner.assertTransferCount(MergeContent.REL_FAILURE, 0); runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3); - final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0); - try (final InputStream rawIn = new ByteArrayInputStream(runner.getContentAsByteArray(bundle)); + final MockFlowFile bundle = runner. + getFlowFilesForRelationship(MergeContent.REL_MERGED). + get(0); + try (final InputStream rawIn = new ByteArrayInputStream(runner. + getContentAsByteArray(bundle)); final ZipInputStream in = new ZipInputStream(rawIn)) { Assert.assertNotNull(in.getNextEntry()); final byte[] part1 = IOUtils.toByteArray(in); @@ -172,14 +188,16 @@ public class TestMergeContent { final byte[] part3 = IOUtils.toByteArray(in); Assert.assertTrue(Arrays.equals("World!".getBytes("UTF-8"), part3)); } - bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/zip"); + bundle. + assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/zip"); } @Test public void testTar() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec"); - runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_TAR); + runner. + setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_TAR); final Map<String, String> attributes = new HashMap<>(); attributes.put(CoreAttributes.MIME_TYPE.key(), "application/plain-text"); @@ -199,8 +217,11 @@ public class TestMergeContent { runner.assertTransferCount(MergeContent.REL_FAILURE, 0); runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3); - final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0); - try (final InputStream rawIn = new ByteArrayInputStream(runner.getContentAsByteArray(bundle)); + final MockFlowFile bundle = runner. + getFlowFilesForRelationship(MergeContent.REL_MERGED). + get(0); + try (final InputStream rawIn = new ByteArrayInputStream(runner. + getContentAsByteArray(bundle)); final TarArchiveInputStream in = new TarArchiveInputStream(rawIn)) { ArchiveEntry entry = in.getNextEntry(); Assert.assertNotNull(entry); @@ -214,11 +235,13 @@ public class TestMergeContent { Assert.assertTrue(Arrays.equals(", ".getBytes("UTF-8"), part2)); entry = in.getNextEntry(); - assertEquals("AReallyLongggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggFileName", entry.getName()); + assertEquals("AReallyLongggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggFileName", entry. + getName()); final byte[] part3 = IOUtils.toByteArray(in); Assert.assertTrue(Arrays.equals("World!".getBytes("UTF-8"), part3)); } - bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/tar"); + bundle. + assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/tar"); } @Test @@ -227,26 +250,33 @@ public class TestMergeContent { runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec"); runner.setProperty(MergeContent.MIN_ENTRIES, "2"); runner.setProperty(MergeContent.MAX_ENTRIES, "2"); - runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_FLOWFILE_STREAM_V3); + runner. + setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_FLOWFILE_STREAM_V3); final Map<String, String> attributes = new HashMap<>(); attributes.put("path", "folder"); - runner.enqueue(Paths.get("src/test/resources/TestUnpackContent/folder/cal.txt"), attributes); - runner.enqueue(Paths.get("src/test/resources/TestUnpackContent/folder/date.txt"), attributes); + runner.enqueue(Paths. + get("src/test/resources/TestUnpackContent/folder/cal.txt"), attributes); + runner.enqueue(Paths. + get("src/test/resources/TestUnpackContent/folder/date.txt"), attributes); runner.run(); runner.assertTransferCount(MergeContent.REL_MERGED, 1); runner.assertTransferCount(MergeContent.REL_FAILURE, 0); runner.assertTransferCount(MergeContent.REL_ORIGINAL, 2); - final MockFlowFile merged = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0); - merged.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/flowfile-v3"); + final MockFlowFile merged = runner. + getFlowFilesForRelationship(MergeContent.REL_MERGED). + get(0); + merged. + assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/flowfile-v3"); } @Test public void testDefragment() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); - runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT); + runner. + setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT); runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min"); final Map<String, String> attributes = new HashMap<>(); @@ -265,14 +295,18 @@ public class TestMergeContent { runner.run(); runner.assertTransferCount(MergeContent.REL_MERGED, 1); - final MockFlowFile assembled = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0); - assembled.assertContentEquals("A Man A Plan A Canal Panama".getBytes("UTF-8")); + final MockFlowFile assembled = runner. + getFlowFilesForRelationship(MergeContent.REL_MERGED). + get(0); + assembled.assertContentEquals("A Man A Plan A Canal Panama". + getBytes("UTF-8")); } - + @Test public void testDefragmentWithTooFewFragments() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); - runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT); + runner. + setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT); runner.setProperty(MergeContent.MAX_BIN_AGE, "2 secs"); final Map<String, String> attributes = new HashMap<>(); @@ -294,18 +328,19 @@ public class TestMergeContent { try { Thread.sleep(3000L); break; - } catch (final InterruptedException ie) {} + } catch (final InterruptedException ie) { + } } runner.run(1); - + runner.assertTransferCount(MergeContent.REL_FAILURE, 4); } - @Test public void testDefragmentOutOfOrder() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); - runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT); + runner. + setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT); runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min"); final Map<String, String> attributes = new HashMap<>(); @@ -324,15 +359,19 @@ public class TestMergeContent { runner.run(); runner.assertTransferCount(MergeContent.REL_MERGED, 1); - final MockFlowFile assembled = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0); - assembled.assertContentEquals("A Man A Plan A Canal Panama".getBytes("UTF-8")); + final MockFlowFile assembled = runner. + getFlowFilesForRelationship(MergeContent.REL_MERGED). + get(0); + assembled.assertContentEquals("A Man A Plan A Canal Panama". + getBytes("UTF-8")); } @Ignore("this test appears to be faulty") @Test public void testDefragmentMultipleMingledSegments() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); - runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT); + runner. + setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT); runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min"); final Map<String, String> attributes = new HashMap<>(); @@ -361,16 +400,22 @@ public class TestMergeContent { runner.run(2); runner.assertTransferCount(MergeContent.REL_MERGED, 2); - final MockFlowFile assembled = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0); - assembled.assertContentEquals("A Man A Plan A Canal Panama".getBytes("UTF-8")); - final MockFlowFile assembledTwo = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(1); + final MockFlowFile assembled = runner. + getFlowFilesForRelationship(MergeContent.REL_MERGED). + get(0); + assembled.assertContentEquals("A Man A Plan A Canal Panama". + getBytes("UTF-8")); + final MockFlowFile assembledTwo = runner. + getFlowFilesForRelationship(MergeContent.REL_MERGED). + get(1); assembledTwo.assertContentEquals("No x in Nixon".getBytes("UTF-8")); } @Test public void testDefragmentOldStyleAttributes() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); - runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT); + runner. + setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT); runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min"); final Map<String, String> attributes = new HashMap<>(); @@ -390,15 +435,20 @@ public class TestMergeContent { runner.run(); runner.assertTransferCount(MergeContent.REL_MERGED, 1); - final MockFlowFile assembled = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0); - assembled.assertContentEquals("A Man A Plan A Canal Panama".getBytes("UTF-8")); - assembled.assertAttributeEquals(CoreAttributes.FILENAME.key(), "originalfilename"); + final MockFlowFile assembled = runner. + getFlowFilesForRelationship(MergeContent.REL_MERGED). + get(0); + assembled.assertContentEquals("A Man A Plan A Canal Panama". + getBytes("UTF-8")); + assembled. + assertAttributeEquals(CoreAttributes.FILENAME.key(), "originalfilename"); } @Test public void testDefragmentMultipleOnTriggers() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); - runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT); + runner. + setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT); final Map<String, String> attributes = new HashMap<>(); attributes.put(MergeContent.FRAGMENT_ID_ATTRIBUTE, "1"); @@ -420,15 +470,19 @@ public class TestMergeContent { runner.run(); runner.assertTransferCount(MergeContent.REL_MERGED, 1); - final MockFlowFile assembled = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0); - assembled.assertContentEquals("A Man A Plan A Canal Panama".getBytes("UTF-8")); + final MockFlowFile assembled = runner. + getFlowFilesForRelationship(MergeContent.REL_MERGED). + get(0); + assembled.assertContentEquals("A Man A Plan A Canal Panama". + getBytes("UTF-8")); } @Ignore("This test appears to be a fail...is retuning 1 instead of 2...needs work") @Test public void testMergeBasedOnCorrelation() throws IOException, InterruptedException { final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); - runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_BIN_PACK); + runner. + setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_BIN_PACK); runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min"); runner.setProperty(MergeContent.CORRELATION_ATTRIBUTE_NAME, "attr"); runner.setProperty(MergeContent.MAX_ENTRIES, "3"); @@ -449,7 +503,8 @@ public class TestMergeContent { runner.assertTransferCount(MergeContent.REL_MERGED, 2); - final List<MockFlowFile> mergedFiles = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED); + final List<MockFlowFile> mergedFiles = runner. + getFlowFilesForRelationship(MergeContent.REL_MERGED); final MockFlowFile merged1 = mergedFiles.get(0); final MockFlowFile merged2 = mergedFiles.get(1); @@ -471,7 +526,8 @@ public class TestMergeContent { @Test public void testMaxBinAge() throws InterruptedException { final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); - runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_BIN_PACK); + runner. + setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_BIN_PACK); runner.setProperty(MergeContent.MAX_BIN_AGE, "2 sec"); runner.setProperty(MergeContent.CORRELATION_ATTRIBUTE_NAME, "attr"); runner.setProperty(MergeContent.MAX_ENTRIES, "500"); @@ -496,7 +552,8 @@ public class TestMergeContent { @Test public void testUniqueAttributes() { final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); - runner.setProperty(MergeContent.ATTRIBUTE_STRATEGY, MergeContent.ATTRIBUTE_STRATEGY_ALL_UNIQUE); + runner. + setProperty(MergeContent.ATTRIBUTE_STRATEGY, MergeContent.ATTRIBUTE_STRATEGY_ALL_UNIQUE); runner.setProperty(MergeContent.MAX_SIZE, "2 B"); runner.setProperty(MergeContent.MIN_SIZE, "2 B"); @@ -515,7 +572,9 @@ public class TestMergeContent { runner.run(); runner.assertTransferCount(MergeContent.REL_MERGED, 1); - final MockFlowFile outFile = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0); + final MockFlowFile outFile = runner. + getFlowFilesForRelationship(MergeContent.REL_MERGED). + get(0); outFile.assertAttributeEquals("abc", "xyz"); outFile.assertAttributeEquals("hello", "good-bye"); @@ -526,7 +585,8 @@ public class TestMergeContent { @Test public void testCommonAttributesOnly() { final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); - runner.setProperty(MergeContent.ATTRIBUTE_STRATEGY, MergeContent.ATTRIBUTE_STRATEGY_ALL_COMMON); + runner. + setProperty(MergeContent.ATTRIBUTE_STRATEGY, MergeContent.ATTRIBUTE_STRATEGY_ALL_COMMON); runner.setProperty(MergeContent.MAX_SIZE, "2 B"); runner.setProperty(MergeContent.MIN_SIZE, "2 B"); @@ -545,7 +605,9 @@ public class TestMergeContent { runner.run(); runner.assertTransferCount(MergeContent.REL_MERGED, 1); - final MockFlowFile outFile = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0); + final MockFlowFile outFile = runner. + getFlowFilesForRelationship(MergeContent.REL_MERGED). + get(0); outFile.assertAttributeEquals("abc", "xyz"); outFile.assertAttributeNotExists("hello"); @@ -553,7 +615,8 @@ public class TestMergeContent { outFile.assertAttributeNotExists("xyz"); final Set<String> uuids = new HashSet<>(); - for (final MockFlowFile mff : runner.getFlowFilesForRelationship(MergeContent.REL_ORIGINAL)) { + for (final MockFlowFile mff : runner. + getFlowFilesForRelationship(MergeContent.REL_ORIGINAL)) { uuids.add(mff.getAttribute(CoreAttributes.UUID.key())); } uuids.add(outFile.getAttribute(CoreAttributes.UUID.key())); @@ -565,7 +628,8 @@ public class TestMergeContent { public void testCountAttribute() throws IOException, InterruptedException { final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec"); - runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT); + runner. + setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT); createFlowFiles(runner); runner.run(); @@ -575,7 +639,9 @@ public class TestMergeContent { runner.assertTransferCount(MergeContent.REL_FAILURE, 0); runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3); - final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0); + final MockFlowFile bundle = runner. + getFlowFilesForRelationship(MergeContent.REL_MERGED). + get(0); bundle.assertContentEquals("Hello, World!".getBytes("UTF-8")); bundle.assertAttributeEquals(MergeContent.MERGE_COUNT_ATTRIBUTE, "3"); bundle.assertAttributeExists(MergeContent.MERGE_BIN_AGE_ATTRIBUTE);
