http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycelTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycelTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycelTest.java new file mode 100644 index 0000000..6346ffd --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycelTest.java @@ -0,0 +1,456 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.io.Closeable; +import java.lang.reflect.Field; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class AbstractKafkaProcessorLifecycelTest { + + private final static Random random = new Random(); + + @Test + public void validateBaseProperties() throws Exception { + TestRunner runner = TestRunners.newTestRunner(DummyProcessor.class); + runner.setProperty(AbstractKafkaProcessor.BOOTSTRAP_SERVERS, ""); + runner.setProperty(AbstractKafkaProcessor.TOPIC, "foo"); + runner.setProperty(ConsumeKafka.CLIENT_ID, "foo"); + + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); + } + + runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo"); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("'bootstrap.servers' validated against 'foo' is invalid")); + } + runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo:1234"); + + runner.removeProperty(ConsumeKafka.TOPIC); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("'topic' is invalid because topic is required")); + } + + runner.setProperty(ConsumeKafka.TOPIC, ""); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); + } + + runner.setProperty(ConsumeKafka.TOPIC, " "); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); + } + runner.setProperty(ConsumeKafka.TOPIC, "blah"); + + runner.removeProperty(ConsumeKafka.CLIENT_ID); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("invalid because client.id is required")); + } + + runner.setProperty(ConsumeKafka.CLIENT_ID, ""); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); + } + + runner.setProperty(ConsumeKafka.CLIENT_ID, " "); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); + } + runner.setProperty(ConsumeKafka.CLIENT_ID, "ghj"); + + runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, ""); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); + } + runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, " "); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); + } + } + + @Test + @Ignore // just for extra sanity check + public void validateConcurrencyWithRandomFailuresMultiple() throws Exception { + for (int i = 0; i < 100; i++) { + validateConcurrencyWithRandomFailures(); + } + } + + @Test + public void validateConcurrencyWithRandomFailures() throws Exception { + ExecutorService processingExecutor = Executors.newFixedThreadPool(32); + final AtomicInteger commitCounter = new AtomicInteger(); + final AtomicInteger rollbackCounter = new AtomicInteger(); + final AtomicInteger yieldCounter = new AtomicInteger(); + + final ProcessSessionFactory sessionFactory = mock(ProcessSessionFactory.class); + final ProcessSession session = mock(ProcessSession.class); + when(sessionFactory.createSession()).thenReturn(session); + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + commitCounter.incrementAndGet(); + return null; + } + }).when(session).commit(); + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + rollbackCounter.incrementAndGet(); + return null; + } + }).when(session).rollback(true); + + final ConcurrencyValidatingProcessor processor = spy(new ConcurrencyValidatingProcessor()); + + int testCount = 1000; + final CountDownLatch latch = new CountDownLatch(testCount); + for (int i = 0; i < testCount; i++) { + processingExecutor.execute(new Runnable() { + @Override + public void run() { + ProcessContext context = mock(ProcessContext.class); + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + yieldCounter.incrementAndGet(); + return null; + } + }).when(context).yield(); + if (random.nextInt(10) == 5) { + when(context.getName()).thenReturn("fail"); + } + try { + processor.onTrigger(context, sessionFactory); + } catch (Exception e) { + fail(); + } finally { + latch.countDown(); + } + } + }); + } + + assertTrue(latch.await(20000, TimeUnit.MILLISECONDS)); + processingExecutor.shutdown(); + + System.out.println("SUCCESS: " + processor.successfulTriggers); + System.out.println("FAILURE: " + processor.failedTriggers); + System.out.println("INIT: " + processor.resourceReinitialized); + System.out.println("YIELD CALLS: " + yieldCounter.get()); + System.out.println("COMMIT CALLS: " + commitCounter.get()); + System.out.println("ROLLBACK CALLS: " + rollbackCounter.get()); + System.out.println("Close CALLS: " + processor.closeCounter.get()); + + /* + * this has to be <= 1 since the last thread may come to finally block + * after acceptTask flag has been reset at which point the close will + * not be called (which is correct behavior since it will be invoked + * explicitly by the life-cycle operations of a running processor). + * + * You can actually observe the =1 behavior in the next test where it is + * always 0 close calls + */ + int closeVsInitDiff = processor.resourceReinitialized.get() - processor.closeCounter.get(); + assertTrue(closeVsInitDiff <= 1); + + assertEquals(commitCounter.get(), processor.successfulTriggers.get()); + assertEquals(rollbackCounter.get(), processor.failedTriggers.get()); + + assertEquals(testCount, + processor.successfulTriggers.get() + processor.failedTriggers.get() + yieldCounter.get()); + } + + @Test + public void validateConcurrencyWithAllSuccesses() throws Exception { + ExecutorService processingExecutor = Executors.newFixedThreadPool(32); + final AtomicInteger commitCounter = new AtomicInteger(); + final AtomicInteger rollbackCounter = new AtomicInteger(); + final AtomicInteger yieldCounter = new AtomicInteger(); + + final ProcessSessionFactory sessionFactory = mock(ProcessSessionFactory.class); + final ProcessSession session = mock(ProcessSession.class); + when(sessionFactory.createSession()).thenReturn(session); + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + commitCounter.incrementAndGet(); + return null; + } + }).when(session).commit(); + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + rollbackCounter.incrementAndGet(); + return null; + } + }).when(session).rollback(true); + + final ConcurrencyValidatingProcessor processor = spy(new ConcurrencyValidatingProcessor()); + + int testCount = 1000; + final CountDownLatch latch = new CountDownLatch(testCount); + for (int i = 0; i < testCount; i++) { + processingExecutor.execute(new Runnable() { + @Override + public void run() { + ProcessContext context = mock(ProcessContext.class); + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + yieldCounter.incrementAndGet(); + return null; + } + }).when(context).yield(); + try { + processor.onTrigger(context, sessionFactory); + } catch (Exception e) { + fail(); + } finally { + latch.countDown(); + } + } + }); + } + + assertTrue(latch.await(30000, TimeUnit.MILLISECONDS)); + processingExecutor.shutdown(); + + System.out.println("SUCCESS: " + processor.successfulTriggers); + System.out.println("FAILURE: " + processor.failedTriggers); + System.out.println("INIT: " + processor.resourceReinitialized); + System.out.println("YIELD CALLS: " + yieldCounter.get()); + System.out.println("COMMIT CALLS: " + commitCounter.get()); + System.out.println("ROLLBACK CALLS: " + rollbackCounter.get()); + System.out.println("Close CALLS: " + processor.closeCounter.get()); + + /* + * unlike previous test this one will always be 1 since there are no + * failures + */ + int closeVsInitDiff = processor.resourceReinitialized.get() - processor.closeCounter.get(); + assertEquals(1, closeVsInitDiff); + + assertEquals(commitCounter.get(), processor.successfulTriggers.get()); + assertEquals(rollbackCounter.get(), processor.failedTriggers.get()); + + assertEquals(testCount, + processor.successfulTriggers.get() + processor.failedTriggers.get() + yieldCounter.get()); + } + + @Test + public void validateConcurrencyWithAllFailures() throws Exception { + ExecutorService processingExecutor = Executors.newFixedThreadPool(32); + final AtomicInteger commitCounter = new AtomicInteger(); + final AtomicInteger rollbackCounter = new AtomicInteger(); + final AtomicInteger yieldCounter = new AtomicInteger(); + + final ProcessSessionFactory sessionFactory = mock(ProcessSessionFactory.class); + final ProcessSession session = mock(ProcessSession.class); + when(sessionFactory.createSession()).thenReturn(session); + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + commitCounter.incrementAndGet(); + return null; + } + }).when(session).commit(); + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + rollbackCounter.incrementAndGet(); + return null; + } + }).when(session).rollback(true); + + final ConcurrencyValidatingProcessor processor = spy(new ConcurrencyValidatingProcessor()); + + int testCount = 1000; + final CountDownLatch latch = new CountDownLatch(testCount); + for (int i = 0; i < testCount; i++) { + processingExecutor.execute(new Runnable() { + @Override + public void run() { + ProcessContext context = mock(ProcessContext.class); + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + yieldCounter.incrementAndGet(); + return null; + } + }).when(context).yield(); + when(context.getName()).thenReturn("fail"); + try { + processor.onTrigger(context, sessionFactory); + } catch (Exception e) { + fail(); + } finally { + latch.countDown(); + } + } + }); + } + + assertTrue(latch.await(20000, TimeUnit.MILLISECONDS)); + processingExecutor.shutdown(); + + System.out.println("SUCCESS: " + processor.successfulTriggers); + System.out.println("FAILURE: " + processor.failedTriggers); + System.out.println("INIT: " + processor.resourceReinitialized); + System.out.println("YIELD CALLS: " + yieldCounter.get()); + System.out.println("COMMIT CALLS: " + commitCounter.get()); + System.out.println("ROLLBACK CALLS: " + rollbackCounter.get()); + System.out.println("Close CALLS: " + processor.closeCounter.get()); + + /* + * unlike previous test this one will always be 0 since all triggers are + * failures + */ + int closeVsInitDiff = processor.resourceReinitialized.get() - processor.closeCounter.get(); + assertEquals(0, closeVsInitDiff); + + assertEquals(commitCounter.get(), processor.successfulTriggers.get()); + assertEquals(rollbackCounter.get(), processor.failedTriggers.get()); + + assertEquals(testCount, + processor.successfulTriggers.get() + processor.failedTriggers.get() + yieldCounter.get()); + } + + /** + * + */ + public static class DummyProcessor extends AbstractKafkaProcessor<Closeable> { + @Override + protected boolean rendezvousWithKafka(ProcessContext context, ProcessSession session) throws ProcessException { + return true; + } + + @Override + protected Closeable buildKafkaResource(ProcessContext context, ProcessSession session) throws ProcessException { + return mock(Closeable.class); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return SHARED_DESCRIPTORS; + } + } + + + public static class ConcurrencyValidatingProcessor extends AbstractKafkaProcessor<Closeable> { + final AtomicInteger failedTriggers = new AtomicInteger(); + final AtomicInteger successfulTriggers = new AtomicInteger(); + final AtomicInteger resourceReinitialized = new AtomicInteger(); + final AtomicInteger closeCounter = new AtomicInteger(); + + ConcurrencyValidatingProcessor() { + try { + Field loggerField = AbstractSessionFactoryProcessor.class.getDeclaredField("logger"); + loggerField.setAccessible(true); + loggerField.set(this, mock(ProcessorLog.class)); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + + @Override + @OnStopped + public void close() { + super.close(); + assertTrue(this.kafkaResource == null); + closeCounter.incrementAndGet(); + } + + @Override + protected boolean rendezvousWithKafka(ProcessContext context, ProcessSession session) { + assertNotNull(this.kafkaResource); + if ("fail".equals(context.getName())) { + failedTriggers.incrementAndGet(); + throw new RuntimeException("Intentional"); + } + this.successfulTriggers.incrementAndGet(); + return true; + } + + @Override + protected Closeable buildKafkaResource(ProcessContext context, ProcessSession session) throws ProcessException { + this.resourceReinitialized.incrementAndGet(); + return mock(Closeable.class); + } + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java new file mode 100644 index 0000000..2031e76 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.nio.charset.StandardCharsets; +import java.util.List; + +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + +// The test is valid and should be ran when working on this module. @Ignore is +// to speed up the overall build +public class ConsumeKafkaTest { + + @Test + public void validatePropertiesValidation() throws Exception { + ConsumeKafka consumeKafka = new ConsumeKafka(); + TestRunner runner = TestRunners.newTestRunner(consumeKafka); + runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "okeydokey:1234"); + runner.setProperty(ConsumeKafka.TOPIC, "foo"); + runner.setProperty(ConsumeKafka.CLIENT_ID, "foo"); + runner.setProperty(ConsumeKafka.GROUP_ID, "foo"); + runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST); + + runner.removeProperty(ConsumeKafka.GROUP_ID); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("invalid because group.id is required")); + } + + runner.setProperty(ConsumeKafka.GROUP_ID, ""); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); + } + + runner.setProperty(ConsumeKafka.GROUP_ID, " "); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); + } + } + + /** + * Will set auto-offset to 'smallest' to ensure that all events (the once + * that were sent before and after consumer startup) are received. + */ + @Test + public void validateGetAllMessages() throws Exception { + String topicName = "validateGetAllMessages"; + + StubConsumeKafka consumeKafka = new StubConsumeKafka(); + + final TestRunner runner = TestRunners.newTestRunner(consumeKafka); + runner.setValidateExpressionUsage(false); + runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); + runner.setProperty(ConsumeKafka.TOPIC, topicName); + runner.setProperty(ConsumeKafka.CLIENT_ID, "foo"); + runner.setProperty(ConsumeKafka.GROUP_ID, "foo"); + runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST); + + byte[][] values = new byte[][] { "Hello-1".getBytes(StandardCharsets.UTF_8), + "Hello-2".getBytes(StandardCharsets.UTF_8), "Hello-3".getBytes(StandardCharsets.UTF_8) }; + consumeKafka.setValues(values); + + runner.run(1, false); + + values = new byte[][] { "Hello-4".getBytes(StandardCharsets.UTF_8), "Hello-5".getBytes(StandardCharsets.UTF_8), + "Hello-6".getBytes(StandardCharsets.UTF_8) }; + consumeKafka.setValues(values); + + runner.run(1, false); + + final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS); + + assertEquals(6, flowFiles.size()); + // spot check + MockFlowFile flowFile = flowFiles.get(0); + String event = new String(flowFile.toByteArray()); + assertEquals("Hello-1", event); + + flowFile = flowFiles.get(1); + event = new String(flowFile.toByteArray()); + assertEquals("Hello-2", event); + + flowFile = flowFiles.get(5); + event = new String(flowFile.toByteArray()); + assertEquals("Hello-6", event); + + consumeKafka.close(); + } + + @Test + public void validateGetAllMessagesWithProvidedDemarcator() throws Exception { + String topicName = "validateGetAllMessagesWithProvidedDemarcator"; + + StubConsumeKafka consumeKafka = new StubConsumeKafka(); + + final TestRunner runner = TestRunners.newTestRunner(consumeKafka); + runner.setValidateExpressionUsage(false); + runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); + runner.setProperty(ConsumeKafka.TOPIC, topicName); + runner.setProperty(ConsumeKafka.CLIENT_ID, "foo"); + runner.setProperty(ConsumeKafka.GROUP_ID, "foo"); + runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST); + runner.setProperty(ConsumeKafka.MESSAGE_DEMARCATOR, "blah"); + + byte[][] values = new byte[][] { "Hello-1".getBytes(StandardCharsets.UTF_8), + "Hi-2".getBytes(StandardCharsets.UTF_8) }; + consumeKafka.setValues(values); + + runner.run(1, false); + List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS); + assertEquals(1, flowFiles.size()); + + values = new byte[][] { "ÐдÑавÑÑвÑйÑе-3".getBytes(StandardCharsets.UTF_8), + "ããã«ã¡ã¯-4".getBytes(StandardCharsets.UTF_8), "Hello-5".getBytes(StandardCharsets.UTF_8) }; + consumeKafka.setValues(values); + + runner.run(1, false); + + flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS); + + assertEquals(2, flowFiles.size()); + MockFlowFile flowFile = flowFiles.get(0); + String[] events = new String(flowFile.toByteArray(), StandardCharsets.UTF_8).split("blah"); + + assertEquals(2, events.length); + + flowFile = flowFiles.get(1); + events = new String(flowFile.toByteArray(), StandardCharsets.UTF_8).split("blah"); + + assertEquals(3, events.length); + // spot check + assertEquals("ÐдÑавÑÑвÑйÑе-3", events[0]); + assertEquals("ããã«ã¡ã¯-4", events[1]); + assertEquals("Hello-5", events[2]); + + consumeKafka.close(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java new file mode 100644 index 0000000..2c45d37 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.kafka.clients.producer.Partitioner; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.nifi.processors.kafka.pubsub.KafkaPublisher.KafkaPublisherResult; +import org.apache.nifi.processors.kafka.test.EmbeddedKafka; +import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import kafka.consumer.Consumer; +import kafka.consumer.ConsumerConfig; +import kafka.consumer.ConsumerIterator; +import kafka.consumer.ConsumerTimeoutException; +import kafka.consumer.KafkaStream; +import kafka.javaapi.consumer.ConsumerConnector; + +public class KafkaPublisherTest { + + private static EmbeddedKafka kafkaLocal; + + private static EmbeddedKafkaProducerHelper producerHelper; + + @BeforeClass + public static void beforeClass() { + kafkaLocal = new EmbeddedKafka(); + kafkaLocal.start(); + producerHelper = new EmbeddedKafkaProducerHelper(kafkaLocal); + } + + @AfterClass + public static void afterClass() throws Exception { + producerHelper.close(); + kafkaLocal.stop(); + } + + @Test + public void validateSuccessfulSendAsWhole() throws Exception { + InputStream contentStream = new ByteArrayInputStream("Hello Kafka".getBytes(StandardCharsets.UTF_8)); + String topicName = "validateSuccessfulSendAsWhole"; + + Properties kafkaProperties = this.buildProducerProperties(); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + + PublishingContext publishingContext = new PublishingContext(contentStream, topicName); + KafkaPublisherResult result = publisher.publish(publishingContext); + + assertEquals(0, result.getLastMessageAcked()); + assertEquals(1, result.getMessagesSent()); + contentStream.close(); + publisher.close(); + + ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName); + assertNotNull(iter.next()); + try { + iter.next(); + } catch (ConsumerTimeoutException e) { + // that's OK since this is the Kafka mechanism to unblock + } + } + + @Test + public void validateSuccessfulSendAsDelimited() throws Exception { + InputStream contentStream = new ByteArrayInputStream( + "Hello Kafka\nHello Kafka\nHello Kafka\nHello Kafka\n".getBytes(StandardCharsets.UTF_8)); + String topicName = "validateSuccessfulSendAsDelimited"; + + Properties kafkaProperties = this.buildProducerProperties(); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + + PublishingContext publishingContext = new PublishingContext(contentStream, topicName); + publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8)); + KafkaPublisherResult result = publisher.publish(publishingContext); + + assertEquals(3, result.getLastMessageAcked()); + assertEquals(4, result.getMessagesSent()); + contentStream.close(); + publisher.close(); + + ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName); + assertNotNull(iter.next()); + assertNotNull(iter.next()); + assertNotNull(iter.next()); + assertNotNull(iter.next()); + try { + iter.next(); + fail(); + } catch (ConsumerTimeoutException e) { + // that's OK since this is the Kafka mechanism to unblock + } + } + + /* + * This test simulates the condition where not all messages were ACKed by + * Kafka + */ + @Test + public void validateRetries() throws Exception { + byte[] testValue = "Hello Kafka1\nHello Kafka2\nHello Kafka3\nHello Kafka4\n".getBytes(StandardCharsets.UTF_8); + InputStream contentStream = new ByteArrayInputStream(testValue); + String topicName = "validateSuccessfulReSendOfFailedSegments"; + + Properties kafkaProperties = this.buildProducerProperties(); + + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + + // simulates the first re-try + int lastAckedMessageIndex = 1; + PublishingContext publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex); + publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8)); + + publisher.publish(publishingContext); + + ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName); + String m1 = new String(iter.next().message()); + String m2 = new String(iter.next().message()); + assertEquals("Hello Kafka3", m1); + assertEquals("Hello Kafka4", m2); + try { + iter.next(); + fail(); + } catch (ConsumerTimeoutException e) { + // that's OK since this is the Kafka mechanism to unblock + } + + // simulates the second re-try + lastAckedMessageIndex = 2; + contentStream = new ByteArrayInputStream(testValue); + publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex); + publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8)); + publisher.publish(publishingContext); + + m1 = new String(iter.next().message()); + assertEquals("Hello Kafka4", m1); + + publisher.close(); + } + + /* + * Similar to the above test, but it sets the first retry index to the last + * possible message index and second index to an out of bound index. The + * expectation is that no messages will be sent to Kafka + */ + @Test + public void validateRetriesWithWrongIndex() throws Exception { + byte[] testValue = "Hello Kafka1\nHello Kafka2\nHello Kafka3\nHello Kafka4\n".getBytes(StandardCharsets.UTF_8); + InputStream contentStream = new ByteArrayInputStream(testValue); + String topicName = "validateRetriesWithWrongIndex"; + + Properties kafkaProperties = this.buildProducerProperties(); + + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + + // simulates the first re-try + int lastAckedMessageIndex = 3; + PublishingContext publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex); + publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8)); + + publisher.publish(publishingContext); + + ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName); + try { + iter.next(); + fail(); + } catch (ConsumerTimeoutException e) { + // that's OK since this is the Kafka mechanism to unblock + } + + // simulates the second re-try + lastAckedMessageIndex = 6; + contentStream = new ByteArrayInputStream(testValue); + publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex); + publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8)); + publisher.publish(publishingContext); + try { + iter.next(); + fail(); + } catch (ConsumerTimeoutException e) { + // that's OK since this is the Kafka mechanism to unblock + } + + publisher.close(); + } + + @Test + public void validateWithMultiByteCharactersNoDelimiter() throws Exception { + String data = "å THIS IS MY NEW TEXT.å IT HAS A NEWLINE."; + InputStream contentStream = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8)); + String topicName = "validateWithMultiByteCharacters"; + + Properties kafkaProperties = this.buildProducerProperties(); + + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + PublishingContext publishingContext = new PublishingContext(contentStream, topicName); + + publisher.publish(publishingContext); + publisher.close(); + + ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName); + String r = new String(iter.next().message(), StandardCharsets.UTF_8); + assertEquals(data, r); + } + + @Test + public void validateWithNonDefaultPartitioner() throws Exception { + String data = "fooandbarandbaz"; + InputStream contentStream = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8)); + String topicName = "validateWithNonDefaultPartitioner"; + + Properties kafkaProperties = this.buildProducerProperties(); + kafkaProperties.setProperty("partitioner.class", TestPartitioner.class.getName()); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + PublishingContext publishingContext = new PublishingContext(contentStream, topicName); + publishingContext.setDelimiterBytes("and".getBytes(StandardCharsets.UTF_8)); + + try { + publisher.publish(publishingContext); + // partitioner should be invoked 3 times + assertTrue(TestPartitioner.counter == 3); + publisher.close(); + } finally { + TestPartitioner.counter = 0; + } + } + + private Properties buildProducerProperties() { + Properties kafkaProperties = new Properties(); + kafkaProperties.put("key.serializer", ByteArraySerializer.class.getName()); + kafkaProperties.put("value.serializer", ByteArraySerializer.class.getName()); + kafkaProperties.setProperty("bootstrap.servers", "localhost:" + kafkaLocal.getKafkaPort()); + kafkaProperties.put("auto.create.topics.enable", "true"); + return kafkaProperties; + } + + private ConsumerIterator<byte[], byte[]> buildConsumer(String topic) { + Properties props = new Properties(); + props.put("zookeeper.connect", "localhost:" + kafkaLocal.getZookeeperPort()); + props.put("group.id", "test"); + props.put("consumer.timeout.ms", "500"); + props.put("auto.offset.reset", "smallest"); + ConsumerConfig consumerConfig = new ConsumerConfig(props); + ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig); + Map<String, Integer> topicCountMap = new HashMap<>(1); + topicCountMap.put(topic, 1); + Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); + List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); + ConsumerIterator<byte[], byte[]> iter = streams.get(0).iterator(); + return iter; + } + + public static class TestPartitioner implements Partitioner { + static int counter; + + @Override + public void configure(Map<String, ?> configs) { + // nothing to do, test + } + + @Override + public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, + Cluster cluster) { + counter++; + return 0; + } + + @Override + public void close() { + counter = 0; + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java new file mode 100644 index 0000000..e669df7 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.nio.charset.StandardCharsets; + +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; +import org.mockito.Mockito; + +// The test is valid and should be ran when working on this module. @Ignore is +// to speed up the overall build +public class PublishKafkaTest { + + @Test + public void validatePropertiesValidation() throws Exception { + PublishKafka publishKafka = new PublishKafka(); + TestRunner runner = TestRunners.newTestRunner(publishKafka); + runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "okeydokey:1234"); + runner.setProperty(PublishKafka.TOPIC, "foo"); + runner.setProperty(PublishKafka.CLIENT_ID, "foo"); + runner.setProperty(PublishKafka.META_WAIT_TIME, "foo"); + + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("'max.block.ms' validated against 'foo' is invalid")); + } + } + + @Test + public void validateCustomValidation() { + String topicName = "validateCustomValidation"; + PublishKafka publishKafka = new PublishKafka(); + + /* + * Validates that Kerberos principle is required if one of SASL set for + * secirity protocol + */ + TestRunner runner = TestRunners.newTestRunner(publishKafka); + runner.setProperty(PublishKafka.TOPIC, topicName); + runner.setProperty(PublishKafka.CLIENT_ID, "foo"); + runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234"); + runner.setProperty(PublishKafka.SECURITY_PROTOCOL, PublishKafka.SEC_SASL_PLAINTEXT); + try { + runner.run(); + fail(); + } catch (Throwable e) { + assertTrue(e.getMessage().contains("'Kerberos Service Name' is invalid because")); + } + runner.shutdown(); + } + + @SuppressWarnings("unchecked") + @Test + public void validateSingleCharacterDemarcatedMessages() { + String topicName = "validateSingleCharacterDemarcatedMessages"; + StubPublishKafka putKafka = new StubPublishKafka(); + TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setProperty(PublishKafka.TOPIC, topicName); + runner.setProperty(PublishKafka.CLIENT_ID, "foo"); + runner.setProperty(PublishKafka.KEY, "key1"); + runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234"); + runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n"); + + runner.enqueue("Hello World\nGoodbye\n1\n2\n3\n4\n5".getBytes(StandardCharsets.UTF_8)); + runner.run(1, false); + assertEquals(0, runner.getQueueSize().getObjectCount()); + Producer<byte[], byte[]> producer = putKafka.getProducer(); + verify(producer, times(7)).send(Mockito.any(ProducerRecord.class)); + runner.shutdown(); + } + + @SuppressWarnings("unchecked") + @Test + public void validateMultiCharacterDemarcatedMessagesAndCustomPartitioner() { + String topicName = "validateMultiCharacterDemarcatedMessagesAndCustomPartitioner"; + StubPublishKafka putKafka = new StubPublishKafka(); + TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setProperty(PublishKafka.TOPIC, topicName); + runner.setProperty(PublishKafka.CLIENT_ID, "foo"); + runner.setProperty(PublishKafka.KEY, "key1"); + runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234"); + runner.setProperty(PublishKafka.PARTITION_CLASS, Partitioners.RoundRobinPartitioner.class.getName()); + runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "foo"); + + runner.enqueue("Hello WorldfooGoodbyefoo1foo2foo3foo4foo5".getBytes(StandardCharsets.UTF_8)); + runner.run(1, false); + assertEquals(0, runner.getQueueSize().getObjectCount()); + Producer<byte[], byte[]> producer = putKafka.getProducer(); + verify(producer, times(7)).send(Mockito.any(ProducerRecord.class)); + + runner.shutdown(); + } + + @SuppressWarnings("unchecked") + @Test + public void validateOnSendFailureAndThenResendSuccess() throws Exception { + String topicName = "validateSendFailureAndThenResendSuccess"; + StubPublishKafka putKafka = new StubPublishKafka(); + + TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setProperty(PublishKafka.TOPIC, topicName); + runner.setProperty(PublishKafka.CLIENT_ID, "foo"); + runner.setProperty(PublishKafka.KEY, "key1"); + runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234"); + runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n"); + runner.setProperty(PublishKafka.META_WAIT_TIME, "500 millis"); + + final String text = "Hello World\nGoodbye\nfail\n2"; + runner.enqueue(text.getBytes(StandardCharsets.UTF_8)); + runner.run(1, false); + assertEquals(1, runner.getQueueSize().getObjectCount()); // due to failure + runner.run(1, false); + assertEquals(0, runner.getQueueSize().getObjectCount()); + Producer<byte[], byte[]> producer = putKafka.getProducer(); + verify(producer, times(4)).send(Mockito.any(ProducerRecord.class)); + runner.shutdown(); + } + + @SuppressWarnings("unchecked") + @Test + public void validateOnFutureGetFailureAndThenResendSuccess() throws Exception { + String topicName = "validateSendFailureAndThenResendSuccess"; + StubPublishKafka putKafka = new StubPublishKafka(); + + TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setProperty(PublishKafka.TOPIC, topicName); + runner.setProperty(PublishKafka.CLIENT_ID, "foo"); + runner.setProperty(PublishKafka.KEY, "key1"); + runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234"); + runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n"); + runner.setProperty(PublishKafka.META_WAIT_TIME, "500 millis"); + + final String text = "Hello World\nGoodbye\nfuturefail\n2"; + runner.enqueue(text.getBytes(StandardCharsets.UTF_8)); + runner.run(1, false); + MockFlowFile ff = runner.getFlowFilesForRelationship(PublishKafka.REL_FAILURE).get(0); + assertNotNull(ff); + runner.enqueue(ff); + + runner.run(1, false); + assertEquals(0, runner.getQueueSize().getObjectCount()); + Producer<byte[], byte[]> producer = putKafka.getProducer(); + // 6 sends due to duplication + verify(producer, times(6)).send(Mockito.any(ProducerRecord.class)); + runner.shutdown(); + } + + @SuppressWarnings("unchecked") + @Test + public void validateDemarcationIntoEmptyMessages() { + String topicName = "validateDemarcationIntoEmptyMessages"; + StubPublishKafka putKafka = new StubPublishKafka(); + final TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setProperty(PublishKafka.TOPIC, topicName); + runner.setProperty(PublishKafka.KEY, "key1"); + runner.setProperty(PublishKafka.CLIENT_ID, "foo"); + runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234"); + runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n"); + + final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes(StandardCharsets.UTF_8); + runner.enqueue(bytes); + runner.run(1); + Producer<byte[], byte[]> producer = putKafka.getProducer(); + verify(producer, times(4)).send(Mockito.any(ProducerRecord.class)); + runner.shutdown(); + } + + @SuppressWarnings("unchecked") + @Test + public void validateComplexRightPartialDemarcatedMessages() { + String topicName = "validateComplexRightPartialDemarcatedMessages"; + StubPublishKafka putKafka = new StubPublishKafka(); + TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setProperty(PublishKafka.TOPIC, topicName); + runner.setProperty(PublishKafka.CLIENT_ID, "foo"); + runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234"); + runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "å <å WILDSTUFFå >å "); + + runner.enqueue("Hello Worldå <å WILDSTUFFå >å Goodbyeå <å WILDSTUFFå >å I Mean IT!å <å WILDSTUFFå >".getBytes(StandardCharsets.UTF_8)); + runner.run(1, false); + + Producer<byte[], byte[]> producer = putKafka.getProducer(); + verify(producer, times(3)).send(Mockito.any(ProducerRecord.class)); + runner.shutdown(); + } + + @SuppressWarnings("unchecked") + @Test + public void validateComplexLeftPartialDemarcatedMessages() { + String topicName = "validateComplexLeftPartialDemarcatedMessages"; + StubPublishKafka putKafka = new StubPublishKafka(); + TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setProperty(PublishKafka.TOPIC, topicName); + runner.setProperty(PublishKafka.CLIENT_ID, "foo"); + runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234"); + runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "å <å WILDSTUFFå >å "); + + runner.enqueue("Hello Worldå <å WILDSTUFFå >å Goodbyeå <å WILDSTUFFå >å I Mean IT!å <å WILDSTUFFå >å <å WILDSTUFFå >å ".getBytes(StandardCharsets.UTF_8)); + runner.run(1, false); + + runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 1); + Producer<byte[], byte[]> producer = putKafka.getProducer(); + verify(producer, times(4)).send(Mockito.any(ProducerRecord.class)); + runner.shutdown(); + } + + @SuppressWarnings("unchecked") + @Test + public void validateComplexPartialMatchDemarcatedMessages() { + String topicName = "validateComplexPartialMatchDemarcatedMessages"; + StubPublishKafka putKafka = new StubPublishKafka(); + TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setProperty(PublishKafka.TOPIC, topicName); + runner.setProperty(PublishKafka.CLIENT_ID, "foo"); + runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234"); + runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "å <å WILDSTUFFå >å "); + + runner.enqueue("Hello Worldå <å WILDSTUFFå >å Goodbyeå <å WILDBOOMSTUFFå >å ".getBytes(StandardCharsets.UTF_8)); + runner.run(1, false); + + runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 1); + Producer<byte[], byte[]> producer = putKafka.getProducer(); + verify(producer, times(2)).send(Mockito.any(ProducerRecord.class)); + runner.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java new file mode 100644 index 0000000..5cee6bc --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +import java.io.InputStream; +import java.nio.charset.StandardCharsets; + +import org.junit.Test; + +public class PublishingContextTest { + + @Test + public void failInvalidConstructorArgs() { + try { + new PublishingContext(null, null); + fail(); + } catch (IllegalArgumentException e) { + // success + } + try { + new PublishingContext(mock(InputStream.class), null); + fail(); + } catch (IllegalArgumentException e) { + // success + } + + try { + new PublishingContext(mock(InputStream.class), ""); + fail(); + } catch (IllegalArgumentException e) { + // success + } + + try { + new PublishingContext(mock(InputStream.class), "mytopic", -3); + fail(); + } catch (IllegalArgumentException e) { + // success + } + } + + @Test + public void validateFullSetting() { + PublishingContext publishingContext = new PublishingContext(mock(InputStream.class), "topic", 3); + publishingContext.setDelimiterBytes("delimiter".getBytes(StandardCharsets.UTF_8)); + publishingContext.setKeyBytes("key".getBytes(StandardCharsets.UTF_8)); + + assertEquals("delimiter", new String(publishingContext.getDelimiterBytes(), StandardCharsets.UTF_8)); + assertEquals("key", new String(publishingContext.getKeyBytes(), StandardCharsets.UTF_8)); + assertEquals("topic", publishingContext.getTopic()); + assertEquals("topic: 'topic'; delimiter: 'delimiter'", publishingContext.toString()); + } + + @Test + public void validateOnlyOnceSetPerInstance() { + PublishingContext publishingContext = new PublishingContext(mock(InputStream.class), "topic"); + publishingContext.setKeyBytes(new byte[] { 0 }); + try { + publishingContext.setKeyBytes(new byte[] { 0 }); + fail(); + } catch (IllegalArgumentException e) { + // success + } + + publishingContext.setDelimiterBytes(new byte[] { 0 }); + try { + publishingContext.setDelimiterBytes(new byte[] { 0 }); + fail(); + } catch (IllegalArgumentException e) { + // success + } + + publishingContext.setMaxRequestSize(1024); + try { + publishingContext.setMaxRequestSize(1024); + fail(); + } catch (IllegalArgumentException e) { + // success + } + + try { + publishingContext.setMaxRequestSize(-10); + fail(); + } catch (IllegalArgumentException e) { + // success + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubConsumeKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubConsumeKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubConsumeKafka.java new file mode 100644 index 0000000..017ad70 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubConsumeKafka.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class StubConsumeKafka extends ConsumeKafka { + + private byte[][] values; + + public void setValues(byte[][] values) { + this.values = values; + } + + + @SuppressWarnings("unchecked") + @Override + protected Consumer<byte[], byte[]> buildKafkaResource(ProcessContext context, ProcessSession session) { + Consumer<byte[], byte[]> consumer = super.buildKafkaResource(context, session); + consumer = mock(Consumer.class); + final String topicName = context.getProperty(TOPIC).evaluateAttributeExpressions().getValue(); + + when(consumer.poll(Mockito.anyLong())).thenAnswer(new Answer<ConsumerRecords<byte[], byte[]>>() { + @Override + public ConsumerRecords<byte[], byte[]> answer(InvocationOnMock invocation) throws Throwable { + List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>(); + for (int i = 0; i < StubConsumeKafka.this.values.length; i++) { + byte[] value = StubConsumeKafka.this.values[i]; + ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(topicName, 0, 0, null, value); + records.add(record); + } + TopicPartition partition = new TopicPartition(topicName, 0); + Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> m = new LinkedHashMap<>(); + m.put(partition, records); + return new ConsumerRecords<>(m); + } + }); + + return consumer; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java new file mode 100644 index 0000000..cf258b8 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.lang.reflect.Field; +import java.nio.charset.StandardCharsets; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class StubPublishKafka extends PublishKafka { + + private volatile Producer<byte[], byte[]> producer; + + private volatile boolean failed; + + public Producer<byte[], byte[]> getProducer() { + return producer; + } + + @SuppressWarnings("unchecked") + @Override + protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session) + throws ProcessException { + Properties kafkaProperties = this.buildKafkaProperties(context); + kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + KafkaPublisher publisher; + try { + Field f = PublishKafka.class.getDeclaredField("brokers"); + f.setAccessible(true); + f.set(this, context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue()); + publisher = (KafkaPublisher) TestUtils.getUnsafe().allocateInstance(KafkaPublisher.class); + producer = mock(Producer.class); + this.instrumentProducer(producer, false); + Field kf = KafkaPublisher.class.getDeclaredField("kafkaProducer"); + kf.setAccessible(true); + kf.set(publisher, producer); + } catch (Exception e) { + throw new IllegalStateException(e); + } + return publisher; + } + + @SuppressWarnings("unchecked") + private void instrumentProducer(Producer<byte[], byte[]> producer, boolean failRandomly) { + when(producer.send(Mockito.any(ProducerRecord.class))).then(new Answer<Future<RecordMetadata>>() { + @SuppressWarnings("rawtypes") + @Override + public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable { + ProducerRecord<byte[], byte[]> record = (ProducerRecord<byte[], byte[]>) invocation.getArguments()[0]; + String value = new String(record.value(), StandardCharsets.UTF_8); + if ("fail".equals(value) && !StubPublishKafka.this.failed) { + StubPublishKafka.this.failed = true; + throw new RuntimeException("intentional"); + } + Future future = mock(Future.class); + if ("futurefail".equals(value) && !StubPublishKafka.this.failed) { + StubPublishKafka.this.failed = true; + when(future.get(Mockito.anyLong(), Mockito.any(TimeUnit.class))).thenThrow(ExecutionException.class); + } + return future; + } + }); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java new file mode 100644 index 0000000..b056a08 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; + +import sun.misc.Unsafe; + +class TestUtils { + + public static void setFinalField(Field field, Object instance, Object newValue) throws Exception { + field.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); + + field.set(instance, newValue); + } + + + static Unsafe getUnsafe() { + try { + Field f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return (Unsafe) f.get(null); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java new file mode 100644 index 0000000..802f889 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.test; + +import java.io.File; +import java.io.IOException; +import java.net.ServerSocket; +import java.util.Properties; + +import org.apache.commons.io.FileUtils; +import org.apache.zookeeper.server.ServerCnxnFactory; +import org.apache.zookeeper.server.ServerConfig; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import org.apache.zookeeper.server.quorum.QuorumPeerConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import kafka.server.KafkaConfig; +import kafka.server.KafkaServerStartable; + +/** + * Embedded Kafka server, primarily to be used for testing. + */ +public class EmbeddedKafka { + + private final KafkaServerStartable kafkaServer; + + private final Properties zookeeperConfig; + + private final Properties kafkaConfig; + + private final ZooKeeperServer zkServer; + + private final Logger logger = LoggerFactory.getLogger(EmbeddedKafka.class); + + private final int kafkaPort; + + private final int zookeeperPort; + + private boolean started; + + /** + * Will create instance of the embedded Kafka server. Kafka and Zookeeper + * configuration properties will be loaded from 'server.properties' and + * 'zookeeper.properties' located at the root of the classpath. + */ + public EmbeddedKafka() { + this(loadPropertiesFromClasspath("/server.properties"), loadPropertiesFromClasspath("/zookeeper.properties")); + } + + /** + * Will create instance of the embedded Kafka server. + * + * @param kafkaConfig + * Kafka configuration properties + * @param zookeeperConfig + * Zookeeper configuration properties + */ + public EmbeddedKafka(Properties kafkaConfig, Properties zookeeperConfig) { + this.cleanupKafkaWorkDir(); + this.zookeeperConfig = zookeeperConfig; + this.kafkaConfig = kafkaConfig; + this.kafkaPort = this.availablePort(); + this.zookeeperPort = this.availablePort(); + + this.kafkaConfig.setProperty("port", String.valueOf(this.kafkaPort)); + this.kafkaConfig.setProperty("zookeeper.connect", "localhost:" + this.zookeeperPort); + this.zookeeperConfig.setProperty("clientPort", String.valueOf(this.zookeeperPort)); + this.zkServer = new ZooKeeperServer(); + this.kafkaServer = new KafkaServerStartable(new KafkaConfig(kafkaConfig)); + } + + /** + * + * @return port for Kafka server + */ + public int getKafkaPort() { + if (!this.started) { + throw new IllegalStateException("Kafka server is not started. Kafka port can't be determined."); + } + return this.kafkaPort; + } + + /** + * + * @return port for Zookeeper server + */ + public int getZookeeperPort() { + if (!this.started) { + throw new IllegalStateException("Kafka server is not started. Zookeeper port can't be determined."); + } + return this.zookeeperPort; + } + + /** + * Will start embedded Kafka server. Its data directories will be created + * at 'kafka-tmp' directory relative to the working directory of the current + * runtime. The data directories will be deleted upon JVM exit. + * + */ + public void start() { + if (!this.started) { + logger.info("Starting Zookeeper server"); + this.startZookeeper(); + + logger.info("Starting Kafka server"); + this.kafkaServer.startup(); + + logger.info("Embeded Kafka is started at localhost:" + this.kafkaServer.serverConfig().port() + + ". Zookeeper connection string: " + this.kafkaConfig.getProperty("zookeeper.connect")); + this.started = true; + } + } + + /** + * Will stop embedded Kafka server, cleaning up all working directories. + */ + public void stop() { + if (this.started) { + logger.info("Shutting down Kafka server"); + this.kafkaServer.shutdown(); + this.kafkaServer.awaitShutdown(); + logger.info("Shutting down Zookeeper server"); + this.shutdownZookeeper(); + logger.info("Embeded Kafka is shut down."); + this.cleanupKafkaWorkDir(); + this.started = false; + } + } + + /** + * + */ + private void cleanupKafkaWorkDir() { + File kafkaTmp = new File("target/kafka-tmp"); + try { + FileUtils.deleteDirectory(kafkaTmp); + } catch (Exception e) { + logger.warn("Failed to delete " + kafkaTmp.getAbsolutePath()); + } + } + + /** + * Will start Zookeeper server via {@link ServerCnxnFactory} + */ + private void startZookeeper() { + QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig(); + try { + quorumConfiguration.parseProperties(this.zookeeperConfig); + + ServerConfig configuration = new ServerConfig(); + configuration.readFrom(quorumConfiguration); + + FileTxnSnapLog txnLog = new FileTxnSnapLog(new File(configuration.getDataLogDir()), new File(configuration.getDataDir())); + + zkServer.setTxnLogFactory(txnLog); + zkServer.setTickTime(configuration.getTickTime()); + zkServer.setMinSessionTimeout(configuration.getMinSessionTimeout()); + zkServer.setMaxSessionTimeout(configuration.getMaxSessionTimeout()); + ServerCnxnFactory zookeeperConnectionFactory = ServerCnxnFactory.createFactory(); + zookeeperConnectionFactory.configure(configuration.getClientPortAddress(), + configuration.getMaxClientCnxns()); + zookeeperConnectionFactory.startup(zkServer); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + throw new IllegalStateException("Failed to start Zookeeper server", e); + } + } + + /** + * Will shut down Zookeeper server. + */ + private void shutdownZookeeper() { + zkServer.shutdown(); + } + + /** + * Will load {@link Properties} from properties file discovered at the + * provided path relative to the root of the classpath. + */ + private static Properties loadPropertiesFromClasspath(String path) { + try { + Properties kafkaProperties = new Properties(); + kafkaProperties.load(Class.class.getResourceAsStream(path)); + return kafkaProperties; + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + + /** + * Will determine the available port used by Kafka/Zookeeper servers. + */ + private int availablePort() { + ServerSocket s = null; + try { + s = new ServerSocket(0); + s.setReuseAddress(true); + return s.getLocalPort(); + } catch (Exception e) { + throw new IllegalStateException("Failed to discover available port.", e); + } finally { + try { + s.close(); + } catch (IOException e) { + // ignore + } + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafkaProducerHelper.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafkaProducerHelper.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafkaProducerHelper.java new file mode 100644 index 0000000..0ed00fb --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafkaProducerHelper.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.test; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Properties; + +import kafka.producer.KeyedMessage; +import kafka.producer.OldProducer; + +/** + * Helper class which helps to produce events targeting {@link EmbeddedKafka} + * server. + */ +public class EmbeddedKafkaProducerHelper implements Closeable { + + private final EmbeddedKafka kafkaServer; + + private final OldProducer producer; + + /** + * Will create an instance of EmbeddedKafkaProducerHelper based on default + * configurations.<br> + * Default configuration includes:<br> + * <i> + * metadata.broker.list=[determined from the instance of EmbeddedKafka]<br> + * serializer.class=kafka.serializer.DefaultEncoder<br> + * key.serializer.class=kafka.serializer.DefaultEncoder<br> + * auto.create.topics.enable=true + * </i><br> + * <br> + * If you wish to supply additional configuration properties or override + * existing use + * {@link EmbeddedKafkaProducerHelper#EmbeddedKafkaProducerHelper(EmbeddedKafka, Properties)} + * constructor. + * + * @param kafkaServer + * instance of {@link EmbeddedKafka} + */ + public EmbeddedKafkaProducerHelper(EmbeddedKafka kafkaServer) { + this(kafkaServer, null); + } + + /** + * Will create an instance of EmbeddedKafkaProducerHelper based on default + * configurations and additional configuration properties.<br> + * Default configuration includes:<br> + * metadata.broker.list=[determined from the instance of EmbeddedKafka]<br> + * serializer.class=kafka.serializer.DefaultEncoder<br> + * key.serializer.class=kafka.serializer.DefaultEncoder<br> + * auto.create.topics.enable=true<br> + * <br> + * + * @param kafkaServer + * instance of {@link EmbeddedKafka} + * @param additionalProperties + * instance of {@link Properties} specifying additional producer + * configuration properties. + */ + public EmbeddedKafkaProducerHelper(EmbeddedKafka kafkaServer, Properties additionalProperties) { + this.kafkaServer = kafkaServer; + Properties producerProperties = new Properties(); + producerProperties.put("metadata.broker.list", "localhost:" + this.kafkaServer.getKafkaPort()); + producerProperties.put("serializer.class", "kafka.serializer.DefaultEncoder"); + producerProperties.put("key.serializer.class", "kafka.serializer.DefaultEncoder"); + producerProperties.put("auto.create.topics.enable", "true"); + if (additionalProperties != null) { + producerProperties.putAll(additionalProperties); + } + this.producer = new OldProducer(producerProperties); + } + + /** + * Will send an event to a Kafka topic. If topic doesn't exist it will be + * auto-created. + * + * @param topicName + * Kafka topic name. + * @param event + * string representing an event(message) to be sent to Kafka. + */ + public void sendEvent(String topicName, String event) { + KeyedMessage<byte[], byte[]> data = new KeyedMessage<byte[], byte[]>(topicName, event.getBytes()); + this.producer.send(data.topic(), data.key(), data.message()); + } + + /** + * Will close the underlying Kafka producer. + */ + @Override + public void close() throws IOException { + this.producer.close(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/resources/log4j.properties b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/resources/log4j.properties new file mode 100644 index 0000000..57cd63f --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/resources/log4j.properties @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +log4j.rootCategory=INFO, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %t %c{2}:%L - %m%n + +#og4j.category.org.apache.nifi.processors.kafka=DEBUG
