Repository: nifi Updated Branches: refs/heads/master c730f802b -> cdc1facf3
http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc1facf/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java new file mode 100644 index 0000000..69956e7 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +public class ITConsumeKafka { + + ConsumerLease mockLease = null; + ConsumerPool mockConsumerPool = null; + + @Before + public void setup() { + mockLease = mock(ConsumerLease.class); + mockConsumerPool = mock(ConsumerPool.class); + } + + @Test + public void validateGetAllMessages() throws Exception { + String groupName = "validateGetAllMessages"; + + when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); + when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); + when(mockLease.commit()).thenReturn(Boolean.TRUE); + + ConsumeKafka_1_0 proc = new ConsumeKafka_1_0() { + @Override + protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { + return mockConsumerPool; + } + }; + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); + runner.setProperty(ConsumeKafka_1_0.TOPICS, "foo,bar"); + runner.setProperty(ConsumeKafka_1_0.GROUP_ID, groupName); + runner.setProperty(ConsumeKafka_1_0.AUTO_OFFSET_RESET, ConsumeKafka_1_0.OFFSET_EARLIEST); + runner.run(1, false); + + verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); + verify(mockLease, times(3)).continuePolling(); + verify(mockLease, times(2)).poll(); + verify(mockLease, times(1)).commit(); + verify(mockLease, times(1)).close(); + verifyNoMoreInteractions(mockConsumerPool); + verifyNoMoreInteractions(mockLease); + } + + @Test + public void validateGetAllMessagesPattern() throws Exception { + String groupName = "validateGetAllMessagesPattern"; + + when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); + when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); + when(mockLease.commit()).thenReturn(Boolean.TRUE); + + ConsumeKafka_1_0 proc = new ConsumeKafka_1_0() { + @Override + protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { + return mockConsumerPool; + } + }; + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); + runner.setProperty(ConsumeKafka_1_0.TOPICS, "(fo.*)|(ba)"); + runner.setProperty(ConsumeKafka_1_0.TOPIC_TYPE, "pattern"); + runner.setProperty(ConsumeKafka_1_0.GROUP_ID, groupName); + runner.setProperty(ConsumeKafka_1_0.AUTO_OFFSET_RESET, ConsumeKafka_1_0.OFFSET_EARLIEST); + runner.run(1, false); + + verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); + verify(mockLease, times(3)).continuePolling(); + verify(mockLease, times(2)).poll(); + verify(mockLease, times(1)).commit(); + verify(mockLease, times(1)).close(); + verifyNoMoreInteractions(mockConsumerPool); + verifyNoMoreInteractions(mockLease); + } + + @Test + public void validateGetErrorMessages() throws Exception { + String groupName = "validateGetErrorMessages"; + + when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); + when(mockLease.continuePolling()).thenReturn(true, false); + when(mockLease.commit()).thenReturn(Boolean.FALSE); + + ConsumeKafka_1_0 proc = new ConsumeKafka_1_0() { + @Override + protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { + return mockConsumerPool; + } + }; + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); + runner.setProperty(ConsumeKafka_1_0.TOPICS, "foo,bar"); + runner.setProperty(ConsumeKafka_1_0.GROUP_ID, groupName); + runner.setProperty(ConsumeKafka_1_0.AUTO_OFFSET_RESET, ConsumeKafka_1_0.OFFSET_EARLIEST); + runner.run(1, false); + + verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); + verify(mockLease, times(2)).continuePolling(); + verify(mockLease, times(1)).poll(); + verify(mockLease, times(1)).commit(); + verify(mockLease, times(1)).close(); + verifyNoMoreInteractions(mockConsumerPool); + verifyNoMoreInteractions(mockLease); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc1facf/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/ITLumberjackSocketChannelHandler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/ITLumberjackSocketChannelHandler.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/ITLumberjackSocketChannelHandler.java new file mode 100644 index 0000000..7d54180 --- /dev/null +++ b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/ITLumberjackSocketChannelHandler.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.lumberjack.handler; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import javax.net.ssl.SSLContext; +import javax.xml.bind.DatatypeConverter; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; +import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; +import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher; +import org.apache.nifi.processor.util.listen.event.Event; +import org.apache.nifi.processor.util.listen.event.EventFactory; +import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory; +import org.apache.nifi.processor.util.listen.response.ChannelResponder; +import org.apache.nifi.processors.lumberjack.event.LumberjackMetadata; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + + + + +public class ITLumberjackSocketChannelHandler { + private EventFactory<TestEvent> eventFactory; + private ChannelHandlerFactory<TestEvent,AsyncChannelDispatcher> channelHandlerFactory; + private BlockingQueue<ByteBuffer> byteBuffers; + private BlockingQueue<TestEvent> events; + private ComponentLog logger = Mockito.mock(ComponentLog.class); + private int maxConnections; + private SSLContext sslContext; + private Charset charset; + private ChannelDispatcher dispatcher; + + @Before + public void setup() { + eventFactory = new TestEventHolderFactory(); + channelHandlerFactory = new LumberjackSocketChannelHandlerFactory<>(); + + byteBuffers = new LinkedBlockingQueue<>(); + byteBuffers.add(ByteBuffer.allocate(4096)); + + events = new LinkedBlockingQueue<>(); + logger = Mockito.mock(ComponentLog.class); + + maxConnections = 1; + sslContext = null; + charset = StandardCharsets.UTF_8; + + dispatcher = new SocketChannelDispatcher<>(eventFactory, channelHandlerFactory, byteBuffers, events, logger, + maxConnections, sslContext, charset); + + } + + @Test + public void testBasicHandling() throws IOException, InterruptedException { + final String multiFrameData = "3143000000d7785ec48fcf6ac4201087b3bbe9defb06be40ab669b1602bdf5d49728" + + "031957a97f82232979fbaaa7c0924b2e018701f537f37df2ab699a53aea75cad321673ffe43a38e4e04c043f02" + + "1f71461b26873e711bee9480f48b0af10fe2889113b8c9e28f4322b82395413a50cafd79957c253d0b992faf41" + + "29c2f27c12e5af35be2cedbec133d9b34e0ee27db87db05596fd62f4680796b421964fc9b032ac4dcb54d2575" + + "a28a3559df3413ae7be12edf6e9367c2e07f95ca4a848bb856e1b42ed61427d45da2df4f628f40f0000ffff01000" + + "0ffff35e0eff0"; + final List<String> messages = new ArrayList<>(); + messages.add(multiFrameData); + + run(messages); + + // Check for the 4 frames (from the hex string above) are back... + Assert.assertEquals(4, events.size()); + + boolean found1 = false; + boolean found2 = false; + boolean found3 = false; + boolean found4 = false; + + TestEvent event; + while((event = events.poll()) != null) { + Map<String,String> metadata = event.metadata; + Assert.assertTrue(metadata.containsKey(LumberjackMetadata.SEQNUMBER_KEY)); + + final String seqNum = metadata.get(LumberjackMetadata.SEQNUMBER_KEY); + if (seqNum.equals("1")) { + found1 = true; + } else if (seqNum.equals("2")) { + found2 = true; + } else if (seqNum.equals("3")) { + found3 = true; + } else if (seqNum.equals("4")) { + found4 = true; + } + } + + Assert.assertTrue(found1); + Assert.assertTrue(found2); + Assert.assertTrue(found3); + Assert.assertTrue(found4); + } + + protected void run(List<String> messages) throws IOException, InterruptedException { + final ByteBuffer buffer = ByteBuffer.allocate(1024); + try { + // starts the dispatcher listening on port 0 so it selects a random port + dispatcher.open(null, 0, 4096); + + // starts a thread to run the dispatcher which will accept/read connections + Thread dispatcherThread = new Thread(dispatcher); + dispatcherThread.start(); + + + // create a client connection to the port the dispatcher is listening on + final int realPort = dispatcher.getPort(); + try (SocketChannel channel = SocketChannel.open()) { + channel.connect(new InetSocketAddress("localhost", realPort)); + Thread.sleep(100); + + // send the provided messages + for (int i=0; i < messages.size(); i++) { + buffer.clear(); + buffer.put(DatatypeConverter.parseHexBinary(messages.get(i))); + buffer.flip(); + + while (buffer.hasRemaining()) { + channel.write(buffer); + } + Thread.sleep(1); + } + } + + // wait up to 10 seconds to verify the responses + long timeout = 10000; + long startTime = System.currentTimeMillis(); + while (events.size() < messages.size() && (System.currentTimeMillis() - startTime < timeout)) { + Thread.sleep(100); + } + + // should have gotten an event for each message sent + Assert.assertEquals(4, events.size()); + + } finally { + // stop the dispatcher thread and ensure we shut down handler threads + dispatcher.close(); + } + } + + // Test event to produce from the data + private static class TestEvent implements Event<SocketChannel> { + + private byte[] data; + private Map<String, String> metadata; + + public TestEvent(byte[] data, Map<String, String> metadata) { + this.data = data; + this.metadata = metadata; + } + + @Override + public String getSender() { + return metadata.get(EventFactory.SENDER_KEY); + } + + @Override + public byte[] getData() { + return data; + } + + @Override + public ChannelResponder<SocketChannel> getResponder() { + return null; + } + } + + // Factory to create test events and send responses for testing + private static class TestEventHolderFactory implements EventFactory<TestEvent> { + + @Override + public TestEvent create(final byte[] data, final Map<String, String> metadata, final ChannelResponder responder) { + return new TestEvent(data, metadata); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc1facf/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/TestLumberjackSocketChannelHandler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/TestLumberjackSocketChannelHandler.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/TestLumberjackSocketChannelHandler.java deleted file mode 100644 index ee5a040..0000000 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/TestLumberjackSocketChannelHandler.java +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.lumberjack.handler; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -import javax.net.ssl.SSLContext; -import javax.xml.bind.DatatypeConverter; - -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; -import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; -import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher; -import org.apache.nifi.processor.util.listen.event.Event; -import org.apache.nifi.processor.util.listen.event.EventFactory; -import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory; -import org.apache.nifi.processor.util.listen.response.ChannelResponder; -import org.apache.nifi.processors.lumberjack.event.LumberjackMetadata; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mockito; - - - - -public class TestLumberjackSocketChannelHandler { - private EventFactory<TestEvent> eventFactory; - private ChannelHandlerFactory<TestEvent,AsyncChannelDispatcher> channelHandlerFactory; - private BlockingQueue<ByteBuffer> byteBuffers; - private BlockingQueue<TestEvent> events; - private ComponentLog logger = Mockito.mock(ComponentLog.class); - private int maxConnections; - private SSLContext sslContext; - private Charset charset; - private ChannelDispatcher dispatcher; - - @Before - public void setup() { - eventFactory = new TestEventHolderFactory(); - channelHandlerFactory = new LumberjackSocketChannelHandlerFactory<>(); - - byteBuffers = new LinkedBlockingQueue<>(); - byteBuffers.add(ByteBuffer.allocate(4096)); - - events = new LinkedBlockingQueue<>(); - logger = Mockito.mock(ComponentLog.class); - - maxConnections = 1; - sslContext = null; - charset = StandardCharsets.UTF_8; - - dispatcher = new SocketChannelDispatcher<>(eventFactory, channelHandlerFactory, byteBuffers, events, logger, - maxConnections, sslContext, charset); - - } - - @Test - public void testBasicHandling() throws IOException, InterruptedException { - final String multiFrameData = "3143000000d7785ec48fcf6ac4201087b3bbe9defb06be40ab669b1602bdf5d49728" + - "031957a97f82232979fbaaa7c0924b2e018701f537f37df2ab699a53aea75cad321673ffe43a38e4e04c043f02" + - "1f71461b26873e711bee9480f48b0af10fe2889113b8c9e28f4322b82395413a50cafd79957c253d0b992faf41" + - "29c2f27c12e5af35be2cedbec133d9b34e0ee27db87db05596fd62f4680796b421964fc9b032ac4dcb54d2575" + - "a28a3559df3413ae7be12edf6e9367c2e07f95ca4a848bb856e1b42ed61427d45da2df4f628f40f0000ffff01000" + - "0ffff35e0eff0"; - final List<String> messages = new ArrayList<>(); - messages.add(multiFrameData); - - run(messages); - - // Check for the 4 frames (from the hex string above) are back... - Assert.assertEquals(4, events.size()); - - boolean found1 = false; - boolean found2 = false; - boolean found3 = false; - boolean found4 = false; - - TestEvent event; - while((event = events.poll()) != null) { - Map<String,String> metadata = event.metadata; - Assert.assertTrue(metadata.containsKey(LumberjackMetadata.SEQNUMBER_KEY)); - - final String seqNum = metadata.get(LumberjackMetadata.SEQNUMBER_KEY); - if (seqNum.equals("1")) { - found1 = true; - } else if (seqNum.equals("2")) { - found2 = true; - } else if (seqNum.equals("3")) { - found3 = true; - } else if (seqNum.equals("4")) { - found4 = true; - } - } - - Assert.assertTrue(found1); - Assert.assertTrue(found2); - Assert.assertTrue(found3); - Assert.assertTrue(found4); - } - - protected void run(List<String> messages) throws IOException, InterruptedException { - final ByteBuffer buffer = ByteBuffer.allocate(1024); - try { - // starts the dispatcher listening on port 0 so it selects a random port - dispatcher.open(null, 0, 4096); - - // starts a thread to run the dispatcher which will accept/read connections - Thread dispatcherThread = new Thread(dispatcher); - dispatcherThread.start(); - - - // create a client connection to the port the dispatcher is listening on - final int realPort = dispatcher.getPort(); - try (SocketChannel channel = SocketChannel.open()) { - channel.connect(new InetSocketAddress("localhost", realPort)); - Thread.sleep(100); - - // send the provided messages - for (int i=0; i < messages.size(); i++) { - buffer.clear(); - buffer.put(DatatypeConverter.parseHexBinary(messages.get(i))); - buffer.flip(); - - while (buffer.hasRemaining()) { - channel.write(buffer); - } - Thread.sleep(1); - } - } - - // wait up to 10 seconds to verify the responses - long timeout = 10000; - long startTime = System.currentTimeMillis(); - while (events.size() < messages.size() && (System.currentTimeMillis() - startTime < timeout)) { - Thread.sleep(100); - } - - // should have gotten an event for each message sent - Assert.assertEquals(4, events.size()); - - } finally { - // stop the dispatcher thread and ensure we shut down handler threads - dispatcher.close(); - } - } - - // Test event to produce from the data - private static class TestEvent implements Event<SocketChannel> { - - private byte[] data; - private Map<String, String> metadata; - - public TestEvent(byte[] data, Map<String, String> metadata) { - this.data = data; - this.metadata = metadata; - } - - @Override - public String getSender() { - return metadata.get(EventFactory.SENDER_KEY); - } - - @Override - public byte[] getData() { - return data; - } - - @Override - public ChannelResponder<SocketChannel> getResponder() { - return null; - } - } - - // Factory to create test events and send responses for testing - private static class TestEventHolderFactory implements EventFactory<TestEvent> { - - @Override - public TestEvent create(final byte[] data, final Map<String, String> metadata, final ChannelResponder responder) { - return new TestEvent(data, metadata); - } - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc1facf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ITListenSyslogGroovy.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ITListenSyslogGroovy.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ITListenSyslogGroovy.groovy new file mode 100644 index 0000000..1f44225 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ITListenSyslogGroovy.groovy @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard + +import org.apache.nifi.processor.ProcessContext +import org.apache.nifi.processor.ProcessSessionFactory +import org.apache.nifi.processors.standard.syslog.SyslogParser +import org.apache.nifi.util.TestRunner +import org.apache.nifi.util.TestRunners +import org.bouncycastle.util.encoders.Hex +import org.junit.After +import org.junit.Assert +import org.junit.Before +import org.junit.BeforeClass +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.JUnit4 +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +@RunWith(JUnit4.class) +class ListenSyslogGroovyTest extends GroovyTestCase { + private static final Logger logger = LoggerFactory.getLogger(ListenSyslogGroovyTest.class) + + static final String ZERO_LENGTH_MESSAGE = " \n" + + @BeforeClass + static void setUpOnce() throws Exception { + logger.metaClass.methodMissing = { String name, args -> + logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}") + } + } + + @Before + void setUp() throws Exception { + } + + @After + void tearDown() throws Exception { + } + + @Test + void testShouldHandleZeroLengthUDP() throws Exception { + // Arrange + final ListenSyslog proc = new ListenSyslog() + final TestRunner runner = TestRunners.newTestRunner(proc) + runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue()) + runner.setProperty(ListenSyslog.PORT, "0") + + // schedule to start listening on a random port + final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory() + final ProcessContext context = runner.getProcessContext() + proc.onScheduled(context) + + // Inject a SyslogParser which will always return null + def nullEventParser = [parseEvent: { byte[] bytes, String sender -> + logger.mock("Regardless of input bytes: [${Hex.toHexString(bytes)}] and sender: [${sender}], this parser will return null") + return null + }] as SyslogParser + proc.parser = nullEventParser + + final int numMessages = 10 + final int port = proc.getPort() + Assert.assertTrue(port > 0) + + // write some TCP messages to the port in the background + final Thread sender = new Thread(new ITListenSyslog.SingleConnectionSocketSender(port, numMessages, 100, ZERO_LENGTH_MESSAGE)) + sender.setDaemon(true) + sender.start() + + // Act + + // call onTrigger until we read all messages, or 30 seconds passed + try { + int numFailed = 0 + long timeout = System.currentTimeMillis() + 30000 + + while (numFailed < numMessages && System.currentTimeMillis() < timeout) { + Thread.sleep(50) + proc.onTrigger(context, processSessionFactory) + numFailed = runner.getFlowFilesForRelationship(ListenSyslog.REL_INVALID).size() + } + + int numSuccess = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size() + logger.info("Transferred " + numSuccess + " to SUCCESS and " + numFailed + " to INVALID") + + // Assert + + // all messages should be transferred to invalid + Assert.assertEquals("Did not process all the messages", numMessages, numFailed) + + } finally { + // unschedule to close connections + proc.onUnscheduled() + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc1facf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ListenSyslogGroovyTest.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ListenSyslogGroovyTest.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ListenSyslogGroovyTest.groovy deleted file mode 100644 index 1c6b4f8..0000000 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ListenSyslogGroovyTest.groovy +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.standard - -import org.apache.nifi.processor.ProcessContext -import org.apache.nifi.processor.ProcessSessionFactory -import org.apache.nifi.processors.standard.syslog.SyslogParser -import org.apache.nifi.util.TestRunner -import org.apache.nifi.util.TestRunners -import org.bouncycastle.util.encoders.Hex -import org.junit.After -import org.junit.Assert -import org.junit.Before -import org.junit.BeforeClass -import org.junit.Test -import org.junit.runner.RunWith -import org.junit.runners.JUnit4 -import org.slf4j.Logger -import org.slf4j.LoggerFactory - -@RunWith(JUnit4.class) -class ListenSyslogGroovyTest extends GroovyTestCase { - private static final Logger logger = LoggerFactory.getLogger(ListenSyslogGroovyTest.class) - - static final String ZERO_LENGTH_MESSAGE = " \n" - - @BeforeClass - static void setUpOnce() throws Exception { - logger.metaClass.methodMissing = { String name, args -> - logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}") - } - } - - @Before - void setUp() throws Exception { - } - - @After - void tearDown() throws Exception { - } - - @Test - void testShouldHandleZeroLengthUDP() throws Exception { - // Arrange - final ListenSyslog proc = new ListenSyslog() - final TestRunner runner = TestRunners.newTestRunner(proc) - runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue()) - runner.setProperty(ListenSyslog.PORT, "0") - - // schedule to start listening on a random port - final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory() - final ProcessContext context = runner.getProcessContext() - proc.onScheduled(context) - - // Inject a SyslogParser which will always return null - def nullEventParser = [parseEvent: { byte[] bytes, String sender -> - logger.mock("Regardless of input bytes: [${Hex.toHexString(bytes)}] and sender: [${sender}], this parser will return null") - return null - }] as SyslogParser - proc.parser = nullEventParser - - final int numMessages = 10 - final int port = proc.getPort() - Assert.assertTrue(port > 0) - - // write some TCP messages to the port in the background - final Thread sender = new Thread(new TestListenSyslog.SingleConnectionSocketSender(port, numMessages, 100, ZERO_LENGTH_MESSAGE)) - sender.setDaemon(true) - sender.start() - - // Act - - // call onTrigger until we read all messages, or 30 seconds passed - try { - int numFailed = 0 - long timeout = System.currentTimeMillis() + 30000 - - while (numFailed < numMessages && System.currentTimeMillis() < timeout) { - Thread.sleep(50) - proc.onTrigger(context, processSessionFactory) - numFailed = runner.getFlowFilesForRelationship(ListenSyslog.REL_INVALID).size() - } - - int numSuccess = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size() - logger.info("Transferred " + numSuccess + " to SUCCESS and " + numFailed + " to INVALID") - - // Assert - - // all messages should be transferred to invalid - Assert.assertEquals("Did not process all the messages", numMessages, numFailed) - - } finally { - // unschedule to close connections - proc.onUnscheduled() - } - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc1facf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITListenSyslog.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITListenSyslog.java new file mode 100644 index 0000000..a94bb20 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITListenSyslog.java @@ -0,0 +1,402 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processors.standard.syslog.SyslogAttributes; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.nio.charset.Charset; +import java.util.List; +import org.apache.nifi.processors.standard.TestListenSyslog.DatagramSender; + +public class ITListenSyslog { + + static final Logger LOGGER = LoggerFactory.getLogger(ITListenSyslog.class); + + static final String PRI = "34"; + static final String SEV = "2"; + static final String FAC = "4"; + static final String TIME = "Oct 13 15:43:23"; + static final String HOST = "localhost.home"; + static final String BODY = "some message"; + + static final String VALID_MESSAGE = "<" + PRI + ">" + TIME + " " + HOST + " " + BODY; + static final String VALID_MESSAGE_TCP = "<" + PRI + ">" + TIME + " " + HOST + " " + BODY + "\n"; + static final String INVALID_MESSAGE = "this is not valid\n"; + + @Test + public void testUDP() throws IOException, InterruptedException { + final ListenSyslog proc = new ListenSyslog(); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.UDP_VALUE.getValue()); + runner.setProperty(ListenSyslog.PORT, "0"); + + // schedule to start listening on a random port + final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); + final ProcessContext context = runner.getProcessContext(); + proc.onScheduled(context); + + final int numMessages = 20; + final int port = proc.getPort(); + Assert.assertTrue(port > 0); + + // write some UDP messages to the port in the background + final Thread sender = new Thread(new DatagramSender(port, numMessages, 10, VALID_MESSAGE)); + sender.setDaemon(true); + sender.start(); + + // call onTrigger until we read all datagrams, or 30 seconds passed + try { + int numTransferred = 0; + long timeout = System.currentTimeMillis() + 30000; + + while (numTransferred < numMessages && System.currentTimeMillis() < timeout) { + Thread.sleep(10); + proc.onTrigger(context, processSessionFactory); + numTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size(); + } + Assert.assertEquals("Did not process all the datagrams", numMessages, numTransferred); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0); + checkFlowFile(flowFile, 0, ListenSyslog.UDP_VALUE.getValue()); + + final List<ProvenanceEventRecord> events = runner.getProvenanceEvents(); + Assert.assertNotNull(events); + Assert.assertEquals(numMessages, events.size()); + + final ProvenanceEventRecord event = events.get(0); + Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType()); + Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("udp")); + + } finally { + // unschedule to close connections + proc.onUnscheduled(); + } + } + + @Test + public void testTCPSingleConnection() throws IOException, InterruptedException { + final ListenSyslog proc = new ListenSyslog(); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue()); + runner.setProperty(ListenSyslog.PORT, "0"); + + // schedule to start listening on a random port + final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); + final ProcessContext context = runner.getProcessContext(); + proc.onScheduled(context); + + // Allow time for the processor to perform its scheduled start + Thread.sleep(500); + + final int numMessages = 20; + final int port = proc.getPort(); + Assert.assertTrue(port > 0); + + // write some TCP messages to the port in the background + final Thread sender = new Thread(new SingleConnectionSocketSender(port, numMessages, 10, VALID_MESSAGE_TCP)); + sender.setDaemon(true); + sender.start(); + + // call onTrigger until we read all messages, or 30 seconds passed + try { + int nubTransferred = 0; + long timeout = System.currentTimeMillis() + 30000; + + while (nubTransferred < numMessages && System.currentTimeMillis() < timeout) { + Thread.sleep(10); + proc.onTrigger(context, processSessionFactory); + nubTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size(); + } + Assert.assertEquals("Did not process all the messages", numMessages, nubTransferred); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0); + checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue()); + + final List<ProvenanceEventRecord> events = runner.getProvenanceEvents(); + Assert.assertNotNull(events); + Assert.assertEquals(numMessages, events.size()); + + final ProvenanceEventRecord event = events.get(0); + Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType()); + Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("tcp")); + + } finally { + // unschedule to close connections + proc.onUnscheduled(); + } + } + + @Test + public void testTCPSingleConnectionWithNewLines() throws IOException, InterruptedException { + final ListenSyslog proc = new ListenSyslog(); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue()); + runner.setProperty(ListenSyslog.PORT, "0"); + + // schedule to start listening on a random port + final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); + final ProcessContext context = runner.getProcessContext(); + proc.onScheduled(context); + + final int numMessages = 3; + final int port = proc.getPort(); + Assert.assertTrue(port > 0); + + // send 3 messages as 1 + final String multipleMessages = VALID_MESSAGE_TCP + "\n" + VALID_MESSAGE_TCP + "\n" + VALID_MESSAGE_TCP + "\n"; + final Thread sender = new Thread(new SingleConnectionSocketSender(port, 1, 10, multipleMessages)); + sender.setDaemon(true); + sender.start(); + + // call onTrigger until we read all messages, or 30 seconds passed + try { + int nubTransferred = 0; + long timeout = System.currentTimeMillis() + 30000; + + while (nubTransferred < numMessages && System.currentTimeMillis() < timeout) { + Thread.sleep(10); + proc.onTrigger(context, processSessionFactory); + nubTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size(); + } + Assert.assertEquals("Did not process all the messages", numMessages, nubTransferred); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0); + checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue()); + + final List<ProvenanceEventRecord> events = runner.getProvenanceEvents(); + Assert.assertNotNull(events); + Assert.assertEquals(numMessages, events.size()); + + final ProvenanceEventRecord event = events.get(0); + Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType()); + Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("tcp")); + + } finally { + // unschedule to close connections + proc.onUnscheduled(); + } + } + + @Test + public void testTCPMultipleConnection() throws IOException, InterruptedException { + final ListenSyslog proc = new ListenSyslog(); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue()); + runner.setProperty(ListenSyslog.MAX_CONNECTIONS, "5"); + runner.setProperty(ListenSyslog.PORT, "0"); + + // schedule to start listening on a random port + final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); + final ProcessContext context = runner.getProcessContext(); + proc.onScheduled(context); + + final int numMessages = 20; + final int port = proc.getPort(); + Assert.assertTrue(port > 0); + + // write some TCP messages to the port in the background + final Thread sender = new Thread(new MultiConnectionSocketSender(port, numMessages, 10, VALID_MESSAGE_TCP)); + sender.setDaemon(true); + sender.start(); + + // call onTrigger until we read all messages, or 30 seconds passed + try { + int nubTransferred = 0; + long timeout = System.currentTimeMillis() + 30000; + + while (nubTransferred < numMessages && System.currentTimeMillis() < timeout) { + Thread.sleep(10); + proc.onTrigger(context, processSessionFactory); + nubTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size(); + } + Assert.assertEquals("Did not process all the messages", numMessages, nubTransferred); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0); + checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue()); + + final List<ProvenanceEventRecord> events = runner.getProvenanceEvents(); + Assert.assertNotNull(events); + Assert.assertEquals(numMessages, events.size()); + + final ProvenanceEventRecord event = events.get(0); + Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType()); + Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("tcp")); + + } finally { + // unschedule to close connections + proc.onUnscheduled(); + } + } + + @Test + public void testInvalid() throws IOException, InterruptedException { + final ListenSyslog proc = new ListenSyslog(); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue()); + runner.setProperty(ListenSyslog.PORT, "0"); + + // schedule to start listening on a random port + final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); + final ProcessContext context = runner.getProcessContext(); + proc.onScheduled(context); + + final int numMessages = 10; + final int port = proc.getPort(); + Assert.assertTrue(port > 0); + + // write some TCP messages to the port in the background + final Thread sender = new Thread(new SingleConnectionSocketSender(port, numMessages, 100, INVALID_MESSAGE)); + sender.setDaemon(true); + sender.start(); + + // call onTrigger until we read all messages, or 30 seconds passed + try { + int nubTransferred = 0; + long timeout = System.currentTimeMillis() + 30000; + + while (nubTransferred < numMessages && System.currentTimeMillis() < timeout) { + Thread.sleep(50); + proc.onTrigger(context, processSessionFactory); + nubTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_INVALID).size(); + } + + // all messages should be transferred to invalid + Assert.assertEquals("Did not process all the messages", numMessages, nubTransferred); + + } finally { + // unschedule to close connections + proc.onUnscheduled(); + } + } + + private void checkFlowFile(final MockFlowFile flowFile, final int port, final String protocol) { + flowFile.assertContentEquals(VALID_MESSAGE.replace("\n", "")); + Assert.assertEquals(PRI, flowFile.getAttribute(SyslogAttributes.PRIORITY.key())); + Assert.assertEquals(SEV, flowFile.getAttribute(SyslogAttributes.SEVERITY.key())); + Assert.assertEquals(FAC, flowFile.getAttribute(SyslogAttributes.FACILITY.key())); + Assert.assertEquals(TIME, flowFile.getAttribute(SyslogAttributes.TIMESTAMP.key())); + Assert.assertEquals(HOST, flowFile.getAttribute(SyslogAttributes.HOSTNAME.key())); + Assert.assertEquals(BODY, flowFile.getAttribute(SyslogAttributes.BODY.key())); + Assert.assertEquals("true", flowFile.getAttribute(SyslogAttributes.VALID.key())); + Assert.assertEquals(String.valueOf(port), flowFile.getAttribute(SyslogAttributes.PORT.key())); + Assert.assertEquals(protocol, flowFile.getAttribute(SyslogAttributes.PROTOCOL.key())); + Assert.assertTrue(!StringUtils.isBlank(flowFile.getAttribute(SyslogAttributes.SENDER.key()))); + } + + /** + * Sends a given number of datagrams to the given port. + */ + public static final class SingleConnectionSocketSender implements Runnable { + + final int port; + final int numMessages; + final long delay; + final String message; + + public SingleConnectionSocketSender(int port, int numMessages, long delay, String message) { + this.port = port; + this.numMessages = numMessages; + this.delay = delay; + this.message = message; + } + + @Override + public void run() { + byte[] bytes = message.getBytes(Charset.forName("UTF-8")); + final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); + + try (SocketChannel channel = SocketChannel.open()) { + channel.connect(new InetSocketAddress("localhost", port)); + + for (int i = 0; i < numMessages; i++) { + buffer.clear(); + buffer.put(bytes); + buffer.flip(); + + while (buffer.hasRemaining()) { + channel.write(buffer); + } + Thread.sleep(delay); + } + } catch (IOException e) { + LOGGER.error(e.getMessage(), e); + } catch (InterruptedException e) { + LOGGER.error(e.getMessage(), e); + } + } + } + + /** + * Sends a given number of datagrams to the given port. + */ + public static final class MultiConnectionSocketSender implements Runnable { + + final int port; + final int numMessages; + final long delay; + final String message; + + public MultiConnectionSocketSender(int port, int numMessages, long delay, String message) { + this.port = port; + this.numMessages = numMessages; + this.delay = delay; + this.message = message; + } + + @Override + public void run() { + byte[] bytes = message.getBytes(Charset.forName("UTF-8")); + final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); + + for (int i = 0; i < numMessages; i++) { + try (SocketChannel channel = SocketChannel.open()) { + channel.connect(new InetSocketAddress("localhost", port)); + + buffer.clear(); + buffer.put(bytes); + buffer.flip(); + + while (buffer.hasRemaining()) { + channel.write(buffer); + } + Thread.sleep(delay); + } catch (IOException e) { + LOGGER.error(e.getMessage(), e); + } catch (InterruptedException e) { + LOGGER.error(e.getMessage(), e); + } + } + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc1facf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java index f96ff22..2c199c1 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java @@ -41,7 +41,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; -import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -62,216 +61,11 @@ public class TestListenSyslog { static final String HOST = "localhost.home"; static final String BODY = "some message"; - static final String VALID_MESSAGE = "<" + PRI + ">" + TIME + " " + HOST + " " + BODY ; + static final String VALID_MESSAGE = "<" + PRI + ">" + TIME + " " + HOST + " " + BODY; static final String VALID_MESSAGE_TCP = "<" + PRI + ">" + TIME + " " + HOST + " " + BODY + "\n"; static final String INVALID_MESSAGE = "this is not valid\n"; @Test - public void testUDP() throws IOException, InterruptedException { - final ListenSyslog proc = new ListenSyslog(); - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.UDP_VALUE.getValue()); - runner.setProperty(ListenSyslog.PORT, "0"); - - // schedule to start listening on a random port - final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); - final ProcessContext context = runner.getProcessContext(); - proc.onScheduled(context); - - final int numMessages = 20; - final int port = proc.getPort(); - Assert.assertTrue(port > 0); - - // write some UDP messages to the port in the background - final Thread sender = new Thread(new DatagramSender(port, numMessages, 10, VALID_MESSAGE)); - sender.setDaemon(true); - sender.start(); - - // call onTrigger until we read all datagrams, or 30 seconds passed - try { - int numTransferred = 0; - long timeout = System.currentTimeMillis() + 30000; - - while (numTransferred < numMessages && System.currentTimeMillis() < timeout) { - Thread.sleep(10); - proc.onTrigger(context, processSessionFactory); - numTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size(); - } - Assert.assertEquals("Did not process all the datagrams", numMessages, numTransferred); - - MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0); - checkFlowFile(flowFile, 0, ListenSyslog.UDP_VALUE.getValue()); - - final List<ProvenanceEventRecord> events = runner.getProvenanceEvents(); - Assert.assertNotNull(events); - Assert.assertEquals(numMessages, events.size()); - - final ProvenanceEventRecord event = events.get(0); - Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType()); - Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("udp")); - - } finally { - // unschedule to close connections - proc.onUnscheduled(); - } - } - - @Test - public void testTCPSingleConnection() throws IOException, InterruptedException { - final ListenSyslog proc = new ListenSyslog(); - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue()); - runner.setProperty(ListenSyslog.PORT, "0"); - - // schedule to start listening on a random port - final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); - final ProcessContext context = runner.getProcessContext(); - proc.onScheduled(context); - - // Allow time for the processor to perform its scheduled start - Thread.sleep(500); - - final int numMessages = 20; - final int port = proc.getPort(); - Assert.assertTrue(port > 0); - - // write some TCP messages to the port in the background - final Thread sender = new Thread(new SingleConnectionSocketSender(port, numMessages, 10, VALID_MESSAGE_TCP)); - sender.setDaemon(true); - sender.start(); - - // call onTrigger until we read all messages, or 30 seconds passed - try { - int nubTransferred = 0; - long timeout = System.currentTimeMillis() + 30000; - - while (nubTransferred < numMessages && System.currentTimeMillis() < timeout) { - Thread.sleep(10); - proc.onTrigger(context, processSessionFactory); - nubTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size(); - } - Assert.assertEquals("Did not process all the messages", numMessages, nubTransferred); - - MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0); - checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue()); - - final List<ProvenanceEventRecord> events = runner.getProvenanceEvents(); - Assert.assertNotNull(events); - Assert.assertEquals(numMessages, events.size()); - - final ProvenanceEventRecord event = events.get(0); - Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType()); - Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("tcp")); - - } finally { - // unschedule to close connections - proc.onUnscheduled(); - } - } - - @Test - public void testTCPSingleConnectionWithNewLines() throws IOException, InterruptedException { - final ListenSyslog proc = new ListenSyslog(); - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue()); - runner.setProperty(ListenSyslog.PORT, "0"); - - // schedule to start listening on a random port - final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); - final ProcessContext context = runner.getProcessContext(); - proc.onScheduled(context); - - final int numMessages = 3; - final int port = proc.getPort(); - Assert.assertTrue(port > 0); - - // send 3 messages as 1 - final String multipleMessages = VALID_MESSAGE_TCP + "\n" + VALID_MESSAGE_TCP + "\n" + VALID_MESSAGE_TCP + "\n"; - final Thread sender = new Thread(new SingleConnectionSocketSender(port, 1, 10, multipleMessages)); - sender.setDaemon(true); - sender.start(); - - // call onTrigger until we read all messages, or 30 seconds passed - try { - int nubTransferred = 0; - long timeout = System.currentTimeMillis() + 30000; - - while (nubTransferred < numMessages && System.currentTimeMillis() < timeout) { - Thread.sleep(10); - proc.onTrigger(context, processSessionFactory); - nubTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size(); - } - Assert.assertEquals("Did not process all the messages", numMessages, nubTransferred); - - MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0); - checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue()); - - final List<ProvenanceEventRecord> events = runner.getProvenanceEvents(); - Assert.assertNotNull(events); - Assert.assertEquals(numMessages, events.size()); - - final ProvenanceEventRecord event = events.get(0); - Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType()); - Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("tcp")); - - } finally { - // unschedule to close connections - proc.onUnscheduled(); - } - } - - @Test - public void testTCPMultipleConnection() throws IOException, InterruptedException { - final ListenSyslog proc = new ListenSyslog(); - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue()); - runner.setProperty(ListenSyslog.MAX_CONNECTIONS, "5"); - runner.setProperty(ListenSyslog.PORT, "0"); - - // schedule to start listening on a random port - final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); - final ProcessContext context = runner.getProcessContext(); - proc.onScheduled(context); - - final int numMessages = 20; - final int port = proc.getPort(); - Assert.assertTrue(port > 0); - - // write some TCP messages to the port in the background - final Thread sender = new Thread(new MultiConnectionSocketSender(port, numMessages, 10, VALID_MESSAGE_TCP)); - sender.setDaemon(true); - sender.start(); - - // call onTrigger until we read all messages, or 30 seconds passed - try { - int nubTransferred = 0; - long timeout = System.currentTimeMillis() + 30000; - - while (nubTransferred < numMessages && System.currentTimeMillis() < timeout) { - Thread.sleep(10); - proc.onTrigger(context, processSessionFactory); - nubTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size(); - } - Assert.assertEquals("Did not process all the messages", numMessages, nubTransferred); - - MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0); - checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue()); - - final List<ProvenanceEventRecord> events = runner.getProvenanceEvents(); - Assert.assertNotNull(events); - Assert.assertEquals(numMessages, events.size()); - - final ProvenanceEventRecord event = events.get(0); - Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType()); - Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("tcp")); - - } finally { - // unschedule to close connections - proc.onUnscheduled(); - } - } - - @Test public void testBatching() throws IOException, InterruptedException { final ListenSyslog proc = new ListenSyslog(); final TestRunner runner = TestRunners.newTestRunner(proc); @@ -325,47 +119,6 @@ public class TestListenSyslog { } @Test - public void testInvalid() throws IOException, InterruptedException { - final ListenSyslog proc = new ListenSyslog(); - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue()); - runner.setProperty(ListenSyslog.PORT, "0"); - - // schedule to start listening on a random port - final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); - final ProcessContext context = runner.getProcessContext(); - proc.onScheduled(context); - - final int numMessages = 10; - final int port = proc.getPort(); - Assert.assertTrue(port > 0); - - // write some TCP messages to the port in the background - final Thread sender = new Thread(new SingleConnectionSocketSender(port, numMessages, 100, INVALID_MESSAGE)); - sender.setDaemon(true); - sender.start(); - - // call onTrigger until we read all messages, or 30 seconds passed - try { - int nubTransferred = 0; - long timeout = System.currentTimeMillis() + 30000; - - while (nubTransferred < numMessages && System.currentTimeMillis() < timeout) { - Thread.sleep(50); - proc.onTrigger(context, processSessionFactory); - nubTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_INVALID).size(); - } - - // all messages should be transferred to invalid - Assert.assertEquals("Did not process all the messages", numMessages, nubTransferred); - - } finally { - // unschedule to close connections - proc.onUnscheduled(); - } - } - - @Test public void testParsingError() throws IOException { final FailParseProcessor proc = new FailParseProcessor(); final TestRunner runner = TestRunners.newTestRunner(proc); @@ -431,21 +184,6 @@ public class TestListenSyslog { runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0).assertContentEquals(VALID_MESSAGE); } - - private void checkFlowFile(final MockFlowFile flowFile, final int port, final String protocol) { - flowFile.assertContentEquals(VALID_MESSAGE.replace("\n", "")); - Assert.assertEquals(PRI, flowFile.getAttribute(SyslogAttributes.PRIORITY.key())); - Assert.assertEquals(SEV, flowFile.getAttribute(SyslogAttributes.SEVERITY.key())); - Assert.assertEquals(FAC, flowFile.getAttribute(SyslogAttributes.FACILITY.key())); - Assert.assertEquals(TIME, flowFile.getAttribute(SyslogAttributes.TIMESTAMP.key())); - Assert.assertEquals(HOST, flowFile.getAttribute(SyslogAttributes.HOSTNAME.key())); - Assert.assertEquals(BODY, flowFile.getAttribute(SyslogAttributes.BODY.key())); - Assert.assertEquals("true", flowFile.getAttribute(SyslogAttributes.VALID.key())); - Assert.assertEquals(String.valueOf(port), flowFile.getAttribute(SyslogAttributes.PORT.key())); - Assert.assertEquals(protocol, flowFile.getAttribute(SyslogAttributes.PROTOCOL.key())); - Assert.assertTrue(!StringUtils.isBlank(flowFile.getAttribute(SyslogAttributes.SENDER.key()))); - } - /** * Sends a given number of datagrams to the given port. */ @@ -470,51 +208,7 @@ public class TestListenSyslog { try (DatagramChannel channel = DatagramChannel.open()) { channel.connect(new InetSocketAddress("localhost", port)); - for (int i=0; i < numMessages; i++) { - buffer.clear(); - buffer.put(bytes); - buffer.flip(); - - while(buffer.hasRemaining()) { - channel.write(buffer); - } - - Thread.sleep(delay); - } - } catch (IOException e) { - LOGGER.error(e.getMessage(), e); - } catch (InterruptedException e) { - LOGGER.error(e.getMessage(), e); - } - } - } - - /** - * Sends a given number of datagrams to the given port. - */ - public static final class SingleConnectionSocketSender implements Runnable { - - final int port; - final int numMessages; - final long delay; - final String message; - - public SingleConnectionSocketSender(int port, int numMessages, long delay, String message) { - this.port = port; - this.numMessages = numMessages; - this.delay = delay; - this.message = message; - } - - @Override - public void run() { - byte[] bytes = message.getBytes(Charset.forName("UTF-8")); - final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); - - try (SocketChannel channel = SocketChannel.open()) { - channel.connect(new InetSocketAddress("localhost", port)); - - for (int i=0; i < numMessages; i++) { + for (int i = 0; i < numMessages; i++) { buffer.clear(); buffer.put(bytes); buffer.flip(); @@ -522,6 +216,7 @@ public class TestListenSyslog { while (buffer.hasRemaining()) { channel.write(buffer); } + Thread.sleep(delay); } } catch (IOException e) { @@ -532,51 +227,9 @@ public class TestListenSyslog { } } - /** - * Sends a given number of datagrams to the given port. - */ - public static final class MultiConnectionSocketSender implements Runnable { - - final int port; - final int numMessages; - final long delay; - final String message; - - public MultiConnectionSocketSender(int port, int numMessages, long delay, String message) { - this.port = port; - this.numMessages = numMessages; - this.delay = delay; - this.message = message; - } - - @Override - public void run() { - byte[] bytes = message.getBytes(Charset.forName("UTF-8")); - final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); - - for (int i=0; i < numMessages; i++) { - try (SocketChannel channel = SocketChannel.open()) { - channel.connect(new InetSocketAddress("localhost", port)); - - buffer.clear(); - buffer.put(bytes); - buffer.flip(); - - while (buffer.hasRemaining()) { - channel.write(buffer); - } - Thread.sleep(delay); - } catch (IOException e) { - LOGGER.error(e.getMessage(), e); - } catch (InterruptedException e) { - LOGGER.error(e.getMessage(), e); - } - } - } - } - // A mock version of ListenSyslog that will queue the provided events private static class FailParseProcessor extends ListenSyslog { + @Override protected SyslogParser getParser() { return new SyslogParser(StandardCharsets.UTF_8) { @@ -589,6 +242,7 @@ public class TestListenSyslog { } private static class CannedMessageProcessor extends ListenSyslog { + private final Iterator<RawSyslogEvent> eventItr; public CannedMessageProcessor(final List<RawSyslogEvent> events) { http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc1facf/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index a5f408c..146a3a2 100644 --- a/pom.xml +++ b/pom.xml @@ -1985,7 +1985,9 @@ <!-- Performs execution of Integration Tests using the Maven FailSafe Plugin. The view of integration tests in this context are those tests interfacing with external sources and services requiring additional - resources or credentials that cannot be explicitly provided. --> + resources or credentials that cannot be explicitly provided. Also appropriate + for tests which depend on inter-thread and/or network or having timing + considerations which could make the tests brittle on various environments.--> <id>integration-tests</id> <build> <plugins>
