http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java new file mode 100644 index 0000000..232af26 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.regex.Pattern; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.provenance.ProvenanceReporter; +import org.apache.nifi.processors.kafka.pubsub.ConsumerPool.PoolStats; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class ConsumerPoolTest { + + private Consumer<byte[], byte[]> consumer = null; + private ProcessSession mockSession = null; + private ProcessContext mockContext = Mockito.mock(ProcessContext.class); + private ProvenanceReporter mockReporter = null; + private ConsumerPool testPool = null; + private ConsumerPool testDemarcatedPool = null; + private ComponentLog logger = null; + + @Before + @SuppressWarnings("unchecked") + public void setup() { + consumer = mock(Consumer.class); + logger = mock(ComponentLog.class); + mockSession = mock(ProcessSession.class); + mockReporter = mock(ProvenanceReporter.class); + when(mockSession.getProvenanceReporter()).thenReturn(mockReporter); + testPool = new ConsumerPool( + 1, + null, + Collections.emptyMap(), + Collections.singletonList("nifi"), + 100L, + "utf-8", + "ssl", + "localhost", + logger, + true, + StandardCharsets.UTF_8, + null) { + @Override + protected Consumer<byte[], byte[]> createKafkaConsumer() { + return consumer; + } + }; + testDemarcatedPool = new ConsumerPool( + 1, + "--demarcator--".getBytes(StandardCharsets.UTF_8), + Collections.emptyMap(), + Collections.singletonList("nifi"), + 100L, + "utf-8", + "ssl", + "localhost", + logger, + true, + StandardCharsets.UTF_8, + Pattern.compile(".*")) { + @Override + protected Consumer<byte[], byte[]> createKafkaConsumer() { + return consumer; + } + }; + } + + @Test + public void validatePoolSimpleCreateClose() throws Exception { + + when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{})); + try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) { + lease.poll(); + } + try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) { + lease.poll(); + } + try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) { + lease.poll(); + } + try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) { + lease.poll(); + } + testPool.close(); + verify(mockSession, times(0)).create(); + verify(mockSession, times(0)).commit(); + final PoolStats stats = testPool.getPoolStats(); + assertEquals(1, stats.consumerCreatedCount); + assertEquals(1, stats.consumerClosedCount); + assertEquals(4, stats.leasesObtainedCount); + } + + @Test + @SuppressWarnings("unchecked") + public void validatePoolSimpleCreatePollClose() throws Exception { + final byte[][] firstPassValues = new byte[][]{ + "Hello-1".getBytes(StandardCharsets.UTF_8), + "Hello-2".getBytes(StandardCharsets.UTF_8), + "Hello-3".getBytes(StandardCharsets.UTF_8) + }; + final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues); + + when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{})); + try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) { + lease.poll(); + lease.commit(); + } + testPool.close(); + verify(mockSession, times(3)).create(); + verify(mockSession, times(1)).commit(); + final PoolStats stats = testPool.getPoolStats(); + assertEquals(1, stats.consumerCreatedCount); + assertEquals(1, stats.consumerClosedCount); + assertEquals(1, stats.leasesObtainedCount); + } + + @Test + public void validatePoolSimpleBatchCreateClose() throws Exception { + when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{})); + for (int i = 0; i < 100; i++) { + try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) { + for (int j = 0; j < 100; j++) { + lease.poll(); + } + } + } + testPool.close(); + verify(mockSession, times(0)).create(); + verify(mockSession, times(0)).commit(); + final PoolStats stats = testPool.getPoolStats(); + assertEquals(1, stats.consumerCreatedCount); + assertEquals(1, stats.consumerClosedCount); + assertEquals(100, stats.leasesObtainedCount); + } + + @Test + @SuppressWarnings("unchecked") + public void validatePoolBatchCreatePollClose() throws Exception { + final byte[][] firstPassValues = new byte[][]{ + "Hello-1".getBytes(StandardCharsets.UTF_8), + "Hello-2".getBytes(StandardCharsets.UTF_8), + "Hello-3".getBytes(StandardCharsets.UTF_8) + }; + final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues); + + when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{})); + try (final ConsumerLease lease = testDemarcatedPool.obtainConsumer(mockSession, mockContext)) { + lease.poll(); + lease.commit(); + } + testDemarcatedPool.close(); + verify(mockSession, times(1)).create(); + verify(mockSession, times(1)).commit(); + final PoolStats stats = testDemarcatedPool.getPoolStats(); + assertEquals(1, stats.consumerCreatedCount); + assertEquals(1, stats.consumerClosedCount); + assertEquals(1, stats.leasesObtainedCount); + } + + @Test + public void validatePoolConsumerFails() throws Exception { + + when(consumer.poll(anyLong())).thenThrow(new KafkaException("oops")); + try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) { + try { + lease.poll(); + fail(); + } catch (final KafkaException ke) { + + } + } + testPool.close(); + verify(mockSession, times(0)).create(); + verify(mockSession, times(0)).commit(); + final PoolStats stats = testPool.getPoolStats(); + assertEquals(1, stats.consumerCreatedCount); + assertEquals(1, stats.consumerClosedCount); + assertEquals(1, stats.leasesObtainedCount); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + static ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final byte[][] rawRecords) { + final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>(); + final TopicPartition tPart = new TopicPartition(topic, partition); + final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>(); + long offset = startingOffset; + for (final byte[] rawRecord : rawRecords) { + final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, UUID.randomUUID().toString().getBytes(), rawRecord); + records.add(rec); + } + map.put(tPart, records); + return new ConsumerRecords(map); + } + +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_1_0.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_1_0.java new file mode 100644 index 0000000..d370fec --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_1_0.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser; +import org.apache.nifi.processors.kafka.pubsub.util.MockRecordWriter; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +public class TestConsumeKafkaRecord_1_0 { + + private ConsumerLease mockLease = null; + private ConsumerPool mockConsumerPool = null; + private TestRunner runner; + + @Before + public void setup() throws InitializationException { + mockLease = mock(ConsumerLease.class); + mockConsumerPool = mock(ConsumerPool.class); + + ConsumeKafkaRecord_1_0 proc = new ConsumeKafkaRecord_1_0() { + @Override + protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { + return mockConsumerPool; + } + }; + + runner = TestRunners.newTestRunner(proc); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234"); + + final String readerId = "record-reader"; + final MockRecordParser readerService = new MockRecordParser(); + readerService.addSchemaField("name", RecordFieldType.STRING); + readerService.addSchemaField("age", RecordFieldType.INT); + runner.addControllerService(readerId, readerService); + runner.enableControllerService(readerService); + + final String writerId = "record-writer"; + final RecordSetWriterFactory writerService = new MockRecordWriter("name, age"); + runner.addControllerService(writerId, writerService); + runner.enableControllerService(writerService); + + runner.setProperty(ConsumeKafkaRecord_1_0.RECORD_READER, readerId); + runner.setProperty(ConsumeKafkaRecord_1_0.RECORD_WRITER, writerId); + } + + @Test + public void validateCustomValidatorSettings() throws Exception { + runner.setProperty(ConsumeKafkaRecord_1_0.TOPICS, "foo"); + runner.setProperty(ConsumeKafkaRecord_1_0.GROUP_ID, "foo"); + runner.setProperty(ConsumeKafkaRecord_1_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_1_0.OFFSET_EARLIEST); + runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + runner.assertValid(); + runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Foo"); + runner.assertNotValid(); + runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + runner.assertValid(); + runner.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + runner.assertValid(); + runner.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + runner.assertNotValid(); + } + + @Test + public void validatePropertiesValidation() throws Exception { + runner.setProperty(ConsumeKafkaRecord_1_0.TOPICS, "foo"); + runner.setProperty(ConsumeKafkaRecord_1_0.GROUP_ID, "foo"); + runner.setProperty(ConsumeKafkaRecord_1_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_1_0.OFFSET_EARLIEST); + + runner.removeProperty(ConsumeKafkaRecord_1_0.GROUP_ID); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("invalid because Group ID is required")); + } + + runner.setProperty(ConsumeKafkaRecord_1_0.GROUP_ID, ""); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); + } + + runner.setProperty(ConsumeKafkaRecord_1_0.GROUP_ID, " "); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); + } + } + + @Test + public void validateGetAllMessages() throws Exception { + String groupName = "validateGetAllMessages"; + + when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); + when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); + when(mockLease.commit()).thenReturn(Boolean.TRUE); + + runner.setProperty(ConsumeKafkaRecord_1_0.TOPICS, "foo,bar"); + runner.setProperty(ConsumeKafkaRecord_1_0.GROUP_ID, groupName); + runner.setProperty(ConsumeKafkaRecord_1_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_1_0.OFFSET_EARLIEST); + runner.run(1, false); + + verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); + verify(mockLease, times(3)).continuePolling(); + verify(mockLease, times(2)).poll(); + verify(mockLease, times(1)).commit(); + verify(mockLease, times(1)).close(); + verifyNoMoreInteractions(mockConsumerPool); + verifyNoMoreInteractions(mockLease); + } + + @Test + public void validateGetAllMessagesPattern() throws Exception { + String groupName = "validateGetAllMessagesPattern"; + + when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); + when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); + when(mockLease.commit()).thenReturn(Boolean.TRUE); + + runner.setProperty(ConsumeKafkaRecord_1_0.TOPICS, "(fo.*)|(ba)"); + runner.setProperty(ConsumeKafkaRecord_1_0.TOPIC_TYPE, "pattern"); + runner.setProperty(ConsumeKafkaRecord_1_0.GROUP_ID, groupName); + runner.setProperty(ConsumeKafkaRecord_1_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_1_0.OFFSET_EARLIEST); + runner.run(1, false); + + verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); + verify(mockLease, times(3)).continuePolling(); + verify(mockLease, times(2)).poll(); + verify(mockLease, times(1)).commit(); + verify(mockLease, times(1)).close(); + verifyNoMoreInteractions(mockConsumerPool); + verifyNoMoreInteractions(mockLease); + } + + @Test + public void validateGetErrorMessages() throws Exception { + String groupName = "validateGetErrorMessages"; + + when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); + when(mockLease.continuePolling()).thenReturn(true, false); + when(mockLease.commit()).thenReturn(Boolean.FALSE); + + runner.setProperty(ConsumeKafkaRecord_1_0.TOPICS, "foo,bar"); + runner.setProperty(ConsumeKafkaRecord_1_0.GROUP_ID, groupName); + runner.setProperty(ConsumeKafkaRecord_1_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_1_0.OFFSET_EARLIEST); + runner.run(1, false); + + verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); + verify(mockLease, times(2)).continuePolling(); + verify(mockLease, times(1)).poll(); + verify(mockLease, times(1)).commit(); + verify(mockLease, times(1)).close(); + verifyNoMoreInteractions(mockConsumerPool); + verifyNoMoreInteractions(mockLease); + } + + @Test + public void testJaasConfiguration() throws Exception { + runner.setProperty(ConsumeKafkaRecord_1_0.TOPICS, "foo"); + runner.setProperty(ConsumeKafkaRecord_1_0.GROUP_ID, "foo"); + runner.setProperty(ConsumeKafkaRecord_1_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_1_0.OFFSET_EARLIEST); + + runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT); + runner.assertNotValid(); + + runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "kafka"); + runner.assertValid(); + + runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "[email protected]"); + runner.assertNotValid(); + + runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "not.A.File"); + runner.assertNotValid(); + + runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "src/test/resources/server.properties"); + runner.assertValid(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java new file mode 100644 index 0000000..78c03ec --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.kafka.pubsub; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeoutException; + +import org.apache.nifi.util.MockComponentLog; +import org.apache.nifi.util.MockFlowFile; +import org.junit.Assert; +import org.junit.Test; + +public class TestInFlightMessageTracker { + + @Test(timeout = 5000L) + public void testAwaitCompletionWhenComplete() throws InterruptedException, TimeoutException { + final MockFlowFile flowFile = new MockFlowFile(1L); + + final InFlightMessageTracker tracker = new InFlightMessageTracker(new MockComponentLog("1", "unit-test")); + tracker.incrementSentCount(flowFile); + + verifyNotComplete(tracker); + + tracker.incrementSentCount(flowFile); + verifyNotComplete(tracker); + + tracker.incrementAcknowledgedCount(flowFile); + verifyNotComplete(tracker); + + tracker.incrementAcknowledgedCount(flowFile); + tracker.awaitCompletion(1L); + } + + @Test(timeout = 5000L) + public void testAwaitCompletionWhileWaiting() throws InterruptedException, ExecutionException { + final MockFlowFile flowFile = new MockFlowFile(1L); + + final InFlightMessageTracker tracker = new InFlightMessageTracker(new MockComponentLog("1", "unit-test")); + tracker.incrementSentCount(flowFile); + + verifyNotComplete(tracker); + + tracker.incrementSentCount(flowFile); + verifyNotComplete(tracker); + + final ExecutorService exec = Executors.newFixedThreadPool(1); + final Future<?> future = exec.submit(() -> { + try { + tracker.awaitCompletion(10000L); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + tracker.incrementAcknowledgedCount(flowFile); + tracker.incrementAcknowledgedCount(flowFile); + + future.get(); + } + + private void verifyNotComplete(final InFlightMessageTracker tracker) throws InterruptedException { + try { + tracker.awaitCompletion(10L); + Assert.fail("Expected timeout"); + } catch (final TimeoutException te) { + // expected + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka.java new file mode 100644 index 0000000..6a0c7ce --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.kafka.pubsub; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +public class TestPublishKafka { + private static final String TOPIC_NAME = "unit-test"; + + private PublisherPool mockPool; + private PublisherLease mockLease; + private TestRunner runner; + + @Before + public void setup() { + mockPool = mock(PublisherPool.class); + mockLease = mock(PublisherLease.class); + + when(mockPool.obtainPublisher()).thenReturn(mockLease); + + runner = TestRunners.newTestRunner(new PublishKafka_1_0() { + @Override + protected PublisherPool createPublisherPool(final ProcessContext context) { + return mockPool; + } + }); + + runner.setProperty(PublishKafka_1_0.TOPIC, TOPIC_NAME); + runner.setProperty(PublishKafka_1_0.DELIVERY_GUARANTEE, PublishKafka_1_0.DELIVERY_REPLICATED); + } + + @Test + public void testSingleSuccess() throws IOException { + final MockFlowFile flowFile = runner.enqueue("hello world"); + + when(mockLease.complete()).thenReturn(createAllSuccessPublishResult(flowFile, 1)); + + runner.run(); + runner.assertAllFlowFilesTransferred(PublishKafka_1_0.REL_SUCCESS, 1); + + verify(mockLease, times(1)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME)); + verify(mockLease, times(1)).complete(); + verify(mockLease, times(0)).poison(); + verify(mockLease, times(1)).close(); + } + + @Test + public void testMultipleSuccess() throws IOException { + final Set<FlowFile> flowFiles = new HashSet<>(); + flowFiles.add(runner.enqueue("hello world")); + flowFiles.add(runner.enqueue("hello world")); + flowFiles.add(runner.enqueue("hello world")); + + + when(mockLease.complete()).thenReturn(createAllSuccessPublishResult(flowFiles, 1)); + + runner.run(); + runner.assertAllFlowFilesTransferred(PublishKafka_1_0.REL_SUCCESS, 3); + + verify(mockLease, times(3)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME)); + verify(mockLease, times(1)).complete(); + verify(mockLease, times(0)).poison(); + verify(mockLease, times(1)).close(); + } + + @Test + public void testSingleFailure() throws IOException { + final MockFlowFile flowFile = runner.enqueue("hello world"); + + when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFile)); + + runner.run(); + runner.assertAllFlowFilesTransferred(PublishKafka_1_0.REL_FAILURE, 1); + + verify(mockLease, times(1)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME)); + verify(mockLease, times(1)).complete(); + verify(mockLease, times(1)).close(); + } + + @Test + public void testMultipleFailures() throws IOException { + final Set<FlowFile> flowFiles = new HashSet<>(); + flowFiles.add(runner.enqueue("hello world")); + flowFiles.add(runner.enqueue("hello world")); + flowFiles.add(runner.enqueue("hello world")); + + when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFiles)); + + runner.run(); + runner.assertAllFlowFilesTransferred(PublishKafka_1_0.REL_FAILURE, 3); + + verify(mockLease, times(3)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME)); + verify(mockLease, times(1)).complete(); + verify(mockLease, times(1)).close(); + } + + @Test + public void testMultipleMessagesPerFlowFile() throws IOException { + final List<FlowFile> flowFiles = new ArrayList<>(); + flowFiles.add(runner.enqueue("hello world")); + flowFiles.add(runner.enqueue("hello world")); + + final Map<FlowFile, Integer> msgCounts = new HashMap<>(); + msgCounts.put(flowFiles.get(0), 10); + msgCounts.put(flowFiles.get(1), 20); + + final PublishResult result = createPublishResult(msgCounts, new HashSet<>(flowFiles), Collections.emptyMap()); + + when(mockLease.complete()).thenReturn(result); + + runner.run(); + runner.assertAllFlowFilesTransferred(PublishKafka_1_0.REL_SUCCESS, 2); + + verify(mockLease, times(2)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME)); + verify(mockLease, times(1)).complete(); + verify(mockLease, times(0)).poison(); + verify(mockLease, times(1)).close(); + + runner.assertAllFlowFilesContainAttribute("msg.count"); + assertEquals(1, runner.getFlowFilesForRelationship(PublishKafka_1_0.REL_SUCCESS).stream() + .filter(ff -> ff.getAttribute("msg.count").equals("10")) + .count()); + assertEquals(1, runner.getFlowFilesForRelationship(PublishKafka_1_0.REL_SUCCESS).stream() + .filter(ff -> ff.getAttribute("msg.count").equals("20")) + .count()); + } + + + @Test + public void testSomeSuccessSomeFailure() throws IOException { + final List<FlowFile> flowFiles = new ArrayList<>(); + flowFiles.add(runner.enqueue("hello world")); + flowFiles.add(runner.enqueue("hello world")); + flowFiles.add(runner.enqueue("hello world")); + flowFiles.add(runner.enqueue("hello world")); + + final Map<FlowFile, Integer> msgCounts = new HashMap<>(); + msgCounts.put(flowFiles.get(0), 10); + msgCounts.put(flowFiles.get(1), 20); + + final Map<FlowFile, Exception> failureMap = new HashMap<>(); + failureMap.put(flowFiles.get(2), new RuntimeException("Intentional Unit Test Exception")); + failureMap.put(flowFiles.get(3), new RuntimeException("Intentional Unit Test Exception")); + + final PublishResult result = createPublishResult(msgCounts, new HashSet<>(flowFiles.subList(0, 2)), failureMap); + + when(mockLease.complete()).thenReturn(result); + + runner.run(); + runner.assertTransferCount(PublishKafka_1_0.REL_SUCCESS, 0); + runner.assertTransferCount(PublishKafka_1_0.REL_FAILURE, 4); + + verify(mockLease, times(4)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME)); + verify(mockLease, times(1)).complete(); + verify(mockLease, times(1)).close(); + + assertTrue(runner.getFlowFilesForRelationship(PublishKafka_1_0.REL_FAILURE).stream() + .noneMatch(ff -> ff.getAttribute("msg.count") != null)); + } + + + private PublishResult createAllSuccessPublishResult(final FlowFile successfulFlowFile, final int msgCount) { + return createAllSuccessPublishResult(Collections.singleton(successfulFlowFile), msgCount); + } + + private PublishResult createAllSuccessPublishResult(final Set<FlowFile> successfulFlowFiles, final int msgCountPerFlowFile) { + final Map<FlowFile, Integer> msgCounts = new HashMap<>(); + for (final FlowFile ff : successfulFlowFiles) { + msgCounts.put(ff, msgCountPerFlowFile); + } + return createPublishResult(msgCounts, successfulFlowFiles, Collections.emptyMap()); + } + + private PublishResult createFailurePublishResult(final FlowFile failure) { + return createFailurePublishResult(Collections.singleton(failure)); + } + + private PublishResult createFailurePublishResult(final Set<FlowFile> failures) { + final Map<FlowFile, Exception> failureMap = failures.stream().collect(Collectors.toMap(ff -> ff, ff -> new RuntimeException("Intentional Unit Test Exception"))); + return createPublishResult(Collections.emptyMap(), Collections.emptySet(), failureMap); + } + + private PublishResult createPublishResult(final Map<FlowFile, Integer> msgCounts, final Set<FlowFile> successFlowFiles, final Map<FlowFile, Exception> failures) { + // sanity check. + for (final FlowFile success : successFlowFiles) { + if (failures.containsKey(success)) { + throw new IllegalArgumentException("Found same FlowFile in both 'success' and 'failures' collections: " + success); + } + } + + return new PublishResult() { + @Override + public boolean isFailure() { + return !failures.isEmpty(); + } + + @Override + public int getSuccessfulMessageCount(FlowFile flowFile) { + Integer count = msgCounts.get(flowFile); + return count == null ? 0 : count.intValue(); + } + + @Override + public Exception getReasonForFailure(FlowFile flowFile) { + return failures.get(flowFile); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_1_0.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_1_0.java new file mode 100644 index 0000000..45439cc --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_1_0.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.kafka.pubsub; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser; +import org.apache.nifi.processors.kafka.pubsub.util.MockRecordWriter; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestPublishKafkaRecord_1_0 { + + private static final String TOPIC_NAME = "unit-test"; + + private PublisherPool mockPool; + private PublisherLease mockLease; + private TestRunner runner; + + @Before + public void setup() throws InitializationException, IOException { + mockPool = mock(PublisherPool.class); + mockLease = mock(PublisherLease.class); + Mockito.doCallRealMethod().when(mockLease).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), + any(RecordSchema.class), any(String.class), any(String.class)); + + when(mockPool.obtainPublisher()).thenReturn(mockLease); + + runner = TestRunners.newTestRunner(new PublishKafkaRecord_1_0() { + @Override + protected PublisherPool createPublisherPool(final ProcessContext context) { + return mockPool; + } + }); + + runner.setProperty(PublishKafkaRecord_1_0.TOPIC, TOPIC_NAME); + + final String readerId = "record-reader"; + final MockRecordParser readerService = new MockRecordParser(); + readerService.addSchemaField("name", RecordFieldType.STRING); + readerService.addSchemaField("age", RecordFieldType.INT); + runner.addControllerService(readerId, readerService); + runner.enableControllerService(readerService); + + final String writerId = "record-writer"; + final RecordSetWriterFactory writerService = new MockRecordWriter("name, age"); + runner.addControllerService(writerId, writerService); + runner.enableControllerService(writerService); + + runner.setProperty(PublishKafkaRecord_1_0.RECORD_READER, readerId); + runner.setProperty(PublishKafkaRecord_1_0.RECORD_WRITER, writerId); + runner.setProperty(PublishKafka_1_0.DELIVERY_GUARANTEE, PublishKafka_1_0.DELIVERY_REPLICATED); + } + + @Test + public void testSingleSuccess() throws IOException { + final MockFlowFile flowFile = runner.enqueue("John Doe, 48"); + + when(mockLease.complete()).thenReturn(createAllSuccessPublishResult(flowFile, 1)); + + runner.run(); + runner.assertAllFlowFilesTransferred(PublishKafkaRecord_1_0.REL_SUCCESS, 1); + + verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME)); + verify(mockLease, times(1)).complete(); + verify(mockLease, times(0)).poison(); + verify(mockLease, times(1)).close(); + } + + @Test + public void testMultipleSuccess() throws IOException { + final Set<FlowFile> flowFiles = new HashSet<>(); + flowFiles.add(runner.enqueue("John Doe, 48")); + flowFiles.add(runner.enqueue("John Doe, 48")); + flowFiles.add(runner.enqueue("John Doe, 48")); + + when(mockLease.complete()).thenReturn(createAllSuccessPublishResult(flowFiles, 1)); + + runner.run(); + runner.assertAllFlowFilesTransferred(PublishKafkaRecord_1_0.REL_SUCCESS, 3); + + verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME)); + verify(mockLease, times(1)).complete(); + verify(mockLease, times(0)).poison(); + verify(mockLease, times(1)).close(); + } + + @Test + public void testSingleFailure() throws IOException { + final MockFlowFile flowFile = runner.enqueue("John Doe, 48"); + + when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFile)); + + runner.run(); + runner.assertAllFlowFilesTransferred(PublishKafkaRecord_1_0.REL_FAILURE, 1); + + verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME)); + verify(mockLease, times(1)).complete(); + verify(mockLease, times(1)).close(); + } + + @Test + public void testMultipleFailures() throws IOException { + final Set<FlowFile> flowFiles = new HashSet<>(); + flowFiles.add(runner.enqueue("John Doe, 48")); + flowFiles.add(runner.enqueue("John Doe, 48")); + flowFiles.add(runner.enqueue("John Doe, 48")); + + when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFiles)); + + runner.run(); + runner.assertAllFlowFilesTransferred(PublishKafkaRecord_1_0.REL_FAILURE, 3); + + verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME)); + verify(mockLease, times(1)).complete(); + verify(mockLease, times(1)).close(); + } + + @Test + public void testMultipleMessagesPerFlowFile() throws IOException { + final List<FlowFile> flowFiles = new ArrayList<>(); + flowFiles.add(runner.enqueue("John Doe, 48\nJane Doe, 47")); + flowFiles.add(runner.enqueue("John Doe, 48\nJane Doe, 29")); + + final Map<FlowFile, Integer> msgCounts = new HashMap<>(); + msgCounts.put(flowFiles.get(0), 10); + msgCounts.put(flowFiles.get(1), 20); + + final PublishResult result = createPublishResult(msgCounts, new HashSet<>(flowFiles), Collections.emptyMap()); + + when(mockLease.complete()).thenReturn(result); + + runner.run(); + runner.assertAllFlowFilesTransferred(PublishKafkaRecord_1_0.REL_SUCCESS, 2); + + verify(mockLease, times(2)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME)); + verify(mockLease, times(4)).publish(any(FlowFile.class), eq(null), any(byte[].class), eq(TOPIC_NAME), any(InFlightMessageTracker.class)); + verify(mockLease, times(1)).complete(); + verify(mockLease, times(0)).poison(); + verify(mockLease, times(1)).close(); + + runner.assertAllFlowFilesContainAttribute("msg.count"); + assertEquals(1, runner.getFlowFilesForRelationship(PublishKafkaRecord_1_0.REL_SUCCESS).stream() + .filter(ff -> ff.getAttribute("msg.count").equals("10")) + .count()); + assertEquals(1, runner.getFlowFilesForRelationship(PublishKafkaRecord_1_0.REL_SUCCESS).stream() + .filter(ff -> ff.getAttribute("msg.count").equals("20")) + .count()); + } + + @Test + public void testNoRecordsInFlowFile() throws IOException { + final List<FlowFile> flowFiles = new ArrayList<>(); + flowFiles.add(runner.enqueue(new byte[0])); + + final Map<FlowFile, Integer> msgCounts = new HashMap<>(); + msgCounts.put(flowFiles.get(0), 0); + + final PublishResult result = createPublishResult(msgCounts, new HashSet<>(flowFiles), Collections.emptyMap()); + + when(mockLease.complete()).thenReturn(result); + + runner.run(); + runner.assertAllFlowFilesTransferred(PublishKafkaRecord_1_0.REL_SUCCESS, 1); + + verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME)); + verify(mockLease, times(1)).complete(); + verify(mockLease, times(0)).poison(); + verify(mockLease, times(1)).close(); + + final MockFlowFile mff = runner.getFlowFilesForRelationship(PublishKafkaRecord_1_0.REL_SUCCESS).get(0); + mff.assertAttributeEquals("msg.count", "0"); + } + + + @Test + public void testSomeSuccessSomeFailure() throws IOException { + final List<FlowFile> flowFiles = new ArrayList<>(); + flowFiles.add(runner.enqueue("John Doe, 48")); + flowFiles.add(runner.enqueue("John Doe, 48")); + flowFiles.add(runner.enqueue("John Doe, 48")); + flowFiles.add(runner.enqueue("John Doe, 48")); + + final Map<FlowFile, Integer> msgCounts = new HashMap<>(); + msgCounts.put(flowFiles.get(0), 10); + msgCounts.put(flowFiles.get(1), 20); + + final Map<FlowFile, Exception> failureMap = new HashMap<>(); + failureMap.put(flowFiles.get(2), new RuntimeException("Intentional Unit Test Exception")); + failureMap.put(flowFiles.get(3), new RuntimeException("Intentional Unit Test Exception")); + + final PublishResult result = createPublishResult(msgCounts, new HashSet<>(flowFiles.subList(0, 2)), failureMap); + + when(mockLease.complete()).thenReturn(result); + + runner.run(); + runner.assertTransferCount(PublishKafkaRecord_1_0.REL_SUCCESS, 0); + runner.assertTransferCount(PublishKafkaRecord_1_0.REL_FAILURE, 4); + + verify(mockLease, times(4)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME)); + verify(mockLease, times(1)).complete(); + verify(mockLease, times(1)).close(); + + assertTrue(runner.getFlowFilesForRelationship(PublishKafkaRecord_1_0.REL_FAILURE).stream() + .noneMatch(ff -> ff.getAttribute("msg.count") != null)); + } + + + private PublishResult createAllSuccessPublishResult(final FlowFile successfulFlowFile, final int msgCount) { + return createAllSuccessPublishResult(Collections.singleton(successfulFlowFile), msgCount); + } + + private PublishResult createAllSuccessPublishResult(final Set<FlowFile> successfulFlowFiles, final int msgCountPerFlowFile) { + final Map<FlowFile, Integer> msgCounts = new HashMap<>(); + for (final FlowFile ff : successfulFlowFiles) { + msgCounts.put(ff, msgCountPerFlowFile); + } + return createPublishResult(msgCounts, successfulFlowFiles, Collections.emptyMap()); + } + + private PublishResult createFailurePublishResult(final FlowFile failure) { + return createFailurePublishResult(Collections.singleton(failure)); + } + + private PublishResult createFailurePublishResult(final Set<FlowFile> failures) { + final Map<FlowFile, Exception> failureMap = failures.stream().collect(Collectors.toMap(ff -> ff, ff -> new RuntimeException("Intentional Unit Test Exception"))); + return createPublishResult(Collections.emptyMap(), Collections.emptySet(), failureMap); + } + + private PublishResult createPublishResult(final Map<FlowFile, Integer> msgCounts, final Set<FlowFile> successFlowFiles, final Map<FlowFile, Exception> failures) { + // sanity check. + for (final FlowFile success : successFlowFiles) { + if (failures.containsKey(success)) { + throw new IllegalArgumentException("Found same FlowFile in both 'success' and 'failures' collections: " + success); + } + } + + return new PublishResult() { + + @Override + public int getSuccessfulMessageCount(FlowFile flowFile) { + Integer count = msgCounts.get(flowFile); + return count == null ? 0 : count.intValue(); + } + + @Override + public Exception getReasonForFailure(FlowFile flowFile) { + return failures.get(flowFile); + } + + @Override + public boolean isFailure() { + return !failures.isEmpty(); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java new file mode 100644 index 0000000..54c1222 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.kafka.pubsub; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.util.MockFlowFile; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + + +public class TestPublisherLease { + private ComponentLog logger; + private Producer<byte[], byte[]> producer; + + @Before + @SuppressWarnings("unchecked") + public void setup() { + logger = Mockito.mock(ComponentLog.class); + producer = Mockito.mock(Producer.class); + } + + @Test + public void testPoisonOnException() throws IOException { + final AtomicInteger poisonCount = new AtomicInteger(0); + + final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 1000L, logger, true, null, StandardCharsets.UTF_8) { + @Override + public void poison() { + poisonCount.incrementAndGet(); + super.poison(); + } + }; + + final FlowFile flowFile = new MockFlowFile(1L); + final String topic = "unit-test"; + final byte[] messageKey = null; + final byte[] demarcatorBytes = null; + + final InputStream failureInputStream = new InputStream() { + @Override + public int read() throws IOException { + throw new IOException("Intentional Unit Test Exception"); + } + }; + + try { + lease.publish(flowFile, failureInputStream, messageKey, demarcatorBytes, topic); + Assert.fail("Expected IOException"); + } catch (final IOException ioe) { + // expected + } + + assertEquals(1, poisonCount.get()); + + final PublishResult result = lease.complete(); + assertTrue(result.isFailure()); + } + + @Test + @SuppressWarnings("unchecked") + public void testPoisonOnFailure() throws IOException { + final AtomicInteger poisonCount = new AtomicInteger(0); + + final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 1000L, logger, true, null, StandardCharsets.UTF_8) { + @Override + public void poison() { + poisonCount.incrementAndGet(); + super.poison(); + } + }; + + final FlowFile flowFile = new MockFlowFile(1L); + final String topic = "unit-test"; + final byte[] messageKey = null; + final byte[] demarcatorBytes = null; + + doAnswer(new Answer<Object>() { + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable { + final Callback callback = invocation.getArgumentAt(1, Callback.class); + callback.onCompletion(null, new RuntimeException("Unit Test Intentional Exception")); + return null; + } + }).when(producer).send(any(ProducerRecord.class), any(Callback.class)); + + lease.publish(flowFile, new ByteArrayInputStream(new byte[1]), messageKey, demarcatorBytes, topic); + + assertEquals(1, poisonCount.get()); + + final PublishResult result = lease.complete(); + assertTrue(result.isFailure()); + } + + @Test + @SuppressWarnings("unchecked") + public void testAllDelimitedMessagesSent() throws IOException { + final AtomicInteger poisonCount = new AtomicInteger(0); + + final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger, true, null, StandardCharsets.UTF_8) { + @Override + protected void poison() { + poisonCount.incrementAndGet(); + super.poison(); + } + }; + + final AtomicInteger correctMessages = new AtomicInteger(0); + final AtomicInteger incorrectMessages = new AtomicInteger(0); + doAnswer(new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + final ProducerRecord<byte[], byte[]> record = invocation.getArgumentAt(0, ProducerRecord.class); + final byte[] value = record.value(); + final String valueString = new String(value, StandardCharsets.UTF_8); + if ("1234567890".equals(valueString)) { + correctMessages.incrementAndGet(); + } else { + incorrectMessages.incrementAndGet(); + } + + return null; + } + }).when(producer).send(any(ProducerRecord.class), any(Callback.class)); + + final FlowFile flowFile = new MockFlowFile(1L); + final String topic = "unit-test"; + final byte[] messageKey = null; + final byte[] demarcatorBytes = "\n".getBytes(StandardCharsets.UTF_8); + + final byte[] flowFileContent = "1234567890\n1234567890\n1234567890\n\n\n\n1234567890\n\n\n1234567890\n\n\n\n".getBytes(StandardCharsets.UTF_8); + lease.publish(flowFile, new ByteArrayInputStream(flowFileContent), messageKey, demarcatorBytes, topic); + + final byte[] flowFileContent2 = new byte[0]; + lease.publish(new MockFlowFile(2L), new ByteArrayInputStream(flowFileContent2), messageKey, demarcatorBytes, topic); + + final byte[] flowFileContent3 = "1234567890\n1234567890".getBytes(StandardCharsets.UTF_8); // no trailing new line + lease.publish(new MockFlowFile(3L), new ByteArrayInputStream(flowFileContent3), messageKey, demarcatorBytes, topic); + + final byte[] flowFileContent4 = "\n\n\n".getBytes(StandardCharsets.UTF_8); + lease.publish(new MockFlowFile(4L), new ByteArrayInputStream(flowFileContent4), messageKey, demarcatorBytes, topic); + + assertEquals(0, poisonCount.get()); + + verify(producer, times(0)).flush(); + + final PublishResult result = lease.complete(); + assertTrue(result.isFailure()); + + assertEquals(7, correctMessages.get()); + assertEquals(0, incorrectMessages.get()); + + verify(producer, times(1)).flush(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherPool.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherPool.java new file mode 100644 index 0000000..afaf841 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherPool.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.kafka.pubsub; + +import static org.junit.Assert.assertEquals; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.nifi.logging.ComponentLog; +import org.junit.Test; +import org.mockito.Mockito; + + +public class TestPublisherPool { + + @Test + public void testLeaseCloseReturnsToPool() { + final Map<String, Object> kafkaProperties = new HashMap<>(); + kafkaProperties.put("bootstrap.servers", "localhost:1111"); + kafkaProperties.put("key.serializer", ByteArraySerializer.class.getName()); + kafkaProperties.put("value.serializer", ByteArraySerializer.class.getName()); + + final PublisherPool pool = new PublisherPool(kafkaProperties, Mockito.mock(ComponentLog.class), 1024 * 1024, 1000L, false, null, StandardCharsets.UTF_8); + assertEquals(0, pool.available()); + + final PublisherLease lease = pool.obtainPublisher(); + assertEquals(0, pool.available()); + + lease.close(); + assertEquals(1, pool.available()); + } + + @Test + public void testPoisonedLeaseNotReturnedToPool() { + final Map<String, Object> kafkaProperties = new HashMap<>(); + kafkaProperties.put("bootstrap.servers", "localhost:1111"); + kafkaProperties.put("key.serializer", ByteArraySerializer.class.getName()); + kafkaProperties.put("value.serializer", ByteArraySerializer.class.getName()); + + final PublisherPool pool = new PublisherPool(kafkaProperties, Mockito.mock(ComponentLog.class), 1024 * 1024, 1000L, false, null, StandardCharsets.UTF_8); + assertEquals(0, pool.available()); + + final PublisherLease lease = pool.obtainPublisher(); + assertEquals(0, pool.available()); + + lease.poison(); + lease.close(); + assertEquals(0, pool.available()); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java new file mode 100644 index 0000000..819e3b7 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; + +import sun.misc.Unsafe; + +class TestUtils { + + public static void setFinalField(Field field, Object instance, Object newValue) throws Exception { + field.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); + + field.set(instance, newValue); + } + + static Unsafe getUnsafe() { + try { + Field f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return (Unsafe) f.get(null); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java new file mode 100644 index 0000000..21527be --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.kafka.pubsub.util; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.SchemaValidationException; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +public class MockRecordParser extends AbstractControllerService implements RecordReaderFactory { + private final List<Object[]> records = new ArrayList<>(); + private final List<RecordField> fields = new ArrayList<>(); + private final int failAfterN; + + public MockRecordParser() { + this(-1); + } + + public MockRecordParser(final int failAfterN) { + this.failAfterN = failAfterN; + } + + + public void addSchemaField(final String fieldName, final RecordFieldType type) { + fields.add(new RecordField(fieldName, type.getDataType())); + } + + public void addRecord(Object... values) { + records.add(values); + } + + @Override + public RecordReader createRecordReader(Map<String, String> variables, InputStream in, ComponentLog logger) throws IOException, SchemaNotFoundException { + final BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + + return new RecordReader() { + private int recordCount = 0; + + @Override + public void close() throws IOException { + } + + @Override + public Record nextRecord(boolean coerceTypes, boolean dropUnknown) throws IOException, MalformedRecordException, SchemaValidationException { + if (failAfterN >= recordCount) { + throw new MalformedRecordException("Intentional Unit Test Exception because " + recordCount + " records have been read"); + } + final String line = reader.readLine(); + if (line == null) { + return null; + } + + recordCount++; + + final String[] values = line.split(","); + final Map<String, Object> valueMap = new HashMap<>(); + int i = 0; + for (final RecordField field : fields) { + final String fieldName = field.getFieldName(); + valueMap.put(fieldName, values[i++].trim()); + } + + return new MapRecord(new SimpleRecordSchema(fields), valueMap); + } + + @Override + public RecordSchema getSchema() { + return new SimpleRecordSchema(fields); + } + }; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java new file mode 100644 index 0000000..90a909d --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.kafka.pubsub.util; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Collections; +import java.util.Map; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; + +public class MockRecordWriter extends AbstractControllerService implements RecordSetWriterFactory { + private final String header; + private final int failAfterN; + private final boolean quoteValues; + + public MockRecordWriter(final String header) { + this(header, true, -1); + } + + public MockRecordWriter(final String header, final boolean quoteValues) { + this(header, quoteValues, -1); + } + + public MockRecordWriter(final String header, final boolean quoteValues, final int failAfterN) { + this.header = header; + this.quoteValues = quoteValues; + this.failAfterN = failAfterN; + } + + @Override + public RecordSchema getSchema(Map<String, String> variables, RecordSchema readSchema) throws SchemaNotFoundException, IOException { + return null; + } + + @Override + public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) { + return new RecordSetWriter() { + + @Override + public void flush() throws IOException { + out.flush(); + } + + @Override + public WriteResult write(final RecordSet rs) throws IOException { + out.write(header.getBytes()); + out.write("\n".getBytes()); + + int recordCount = 0; + final int numCols = rs.getSchema().getFieldCount(); + Record record = null; + while ((record = rs.next()) != null) { + if (++recordCount > failAfterN && failAfterN > -1) { + throw new IOException("Unit Test intentionally throwing IOException after " + failAfterN + " records were written"); + } + + int i = 0; + for (final String fieldName : record.getSchema().getFieldNames()) { + final String val = record.getAsString(fieldName); + if (quoteValues) { + out.write("\"".getBytes()); + if (val != null) { + out.write(val.getBytes()); + } + out.write("\"".getBytes()); + } else if (val != null) { + out.write(val.getBytes()); + } + + if (i++ < numCols - 1) { + out.write(",".getBytes()); + } + } + out.write("\n".getBytes()); + } + + return WriteResult.of(recordCount, Collections.emptyMap()); + } + + @Override + public String getMimeType() { + return "text/plain"; + } + + @Override + public WriteResult write(Record record) throws IOException { + return null; + } + + @Override + public void close() throws IOException { + } + + @Override + public void beginRecordSet() throws IOException { + } + + @Override + public WriteResult finishRecordSet() throws IOException { + return null; + } + }; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java new file mode 100644 index 0000000..a720b11 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.test; + +import java.io.File; +import java.io.IOException; +import java.net.ServerSocket; +import java.util.Properties; + +import org.apache.commons.io.FileUtils; +import org.apache.zookeeper.server.ServerCnxnFactory; +import org.apache.zookeeper.server.ServerConfig; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import org.apache.zookeeper.server.quorum.QuorumPeerConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import kafka.server.KafkaConfig; +import kafka.server.KafkaServerStartable; + +/** + * Embedded Kafka server, primarily to be used for testing. + */ +public class EmbeddedKafka { + + private final KafkaServerStartable kafkaServer; + + private final Properties zookeeperConfig; + + private final Properties kafkaConfig; + + private final ZooKeeperServer zkServer; + + private final Logger logger = LoggerFactory.getLogger(EmbeddedKafka.class); + + private final int kafkaPort; + + private final int zookeeperPort; + + private boolean started; + + /** + * Will create instance of the embedded Kafka server. Kafka and Zookeeper + * configuration properties will be loaded from 'server.properties' and + * 'zookeeper.properties' located at the root of the classpath. + */ + public EmbeddedKafka() { + this(loadPropertiesFromClasspath("/server.properties"), loadPropertiesFromClasspath("/zookeeper.properties")); + } + + /** + * Will create instance of the embedded Kafka server. + * + * @param kafkaConfig + * Kafka configuration properties + * @param zookeeperConfig + * Zookeeper configuration properties + */ + public EmbeddedKafka(Properties kafkaConfig, Properties zookeeperConfig) { + this.cleanupKafkaWorkDir(); + this.zookeeperConfig = zookeeperConfig; + this.kafkaConfig = kafkaConfig; + this.kafkaPort = this.availablePort(); + this.zookeeperPort = this.availablePort(); + + this.kafkaConfig.setProperty("port", String.valueOf(this.kafkaPort)); + this.kafkaConfig.setProperty("zookeeper.connect", "localhost:" + this.zookeeperPort); + this.zookeeperConfig.setProperty("clientPort", String.valueOf(this.zookeeperPort)); + this.zkServer = new ZooKeeperServer(); + this.kafkaServer = new KafkaServerStartable(new KafkaConfig(kafkaConfig)); + } + + /** + * + * @return port for Kafka server + */ + public int getKafkaPort() { + if (!this.started) { + throw new IllegalStateException("Kafka server is not started. Kafka port can't be determined."); + } + return this.kafkaPort; + } + + /** + * + * @return port for Zookeeper server + */ + public int getZookeeperPort() { + if (!this.started) { + throw new IllegalStateException("Kafka server is not started. Zookeeper port can't be determined."); + } + return this.zookeeperPort; + } + + /** + * Will start embedded Kafka server. Its data directories will be created + * at 'kafka-tmp' directory relative to the working directory of the current + * runtime. The data directories will be deleted upon JVM exit. + * + */ + public void start() { + if (!this.started) { + logger.info("Starting Zookeeper server"); + this.startZookeeper(); + + logger.info("Starting Kafka server"); + this.kafkaServer.startup(); + + logger.info("Embedded Kafka is started at localhost:" + this.kafkaServer.serverConfig().port() + + ". Zookeeper connection string: " + this.kafkaConfig.getProperty("zookeeper.connect")); + this.started = true; + } + } + + /** + * Will stop embedded Kafka server, cleaning up all working directories. + */ + public void stop() { + if (this.started) { + logger.info("Shutting down Kafka server"); + this.kafkaServer.shutdown(); + this.kafkaServer.awaitShutdown(); + logger.info("Shutting down Zookeeper server"); + this.shutdownZookeeper(); + logger.info("Embedded Kafka is shut down."); + this.cleanupKafkaWorkDir(); + this.started = false; + } + } + + /** + * + */ + private void cleanupKafkaWorkDir() { + File kafkaTmp = new File("target/kafka-tmp"); + try { + FileUtils.deleteDirectory(kafkaTmp); + } catch (Exception e) { + logger.warn("Failed to delete " + kafkaTmp.getAbsolutePath()); + } + } + + /** + * Will start Zookeeper server via {@link ServerCnxnFactory} + */ + private void startZookeeper() { + QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig(); + try { + quorumConfiguration.parseProperties(this.zookeeperConfig); + + ServerConfig configuration = new ServerConfig(); + configuration.readFrom(quorumConfiguration); + + FileTxnSnapLog txnLog = new FileTxnSnapLog(new File(configuration.getDataLogDir()), new File(configuration.getDataDir())); + + zkServer.setTxnLogFactory(txnLog); + zkServer.setTickTime(configuration.getTickTime()); + zkServer.setMinSessionTimeout(configuration.getMinSessionTimeout()); + zkServer.setMaxSessionTimeout(configuration.getMaxSessionTimeout()); + ServerCnxnFactory zookeeperConnectionFactory = ServerCnxnFactory.createFactory(); + zookeeperConnectionFactory.configure(configuration.getClientPortAddress(), + configuration.getMaxClientCnxns()); + zookeeperConnectionFactory.startup(zkServer); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + throw new IllegalStateException("Failed to start Zookeeper server", e); + } + } + + /** + * Will shut down Zookeeper server. + */ + private void shutdownZookeeper() { + zkServer.shutdown(); + } + + /** + * Will load {@link Properties} from properties file discovered at the + * provided path relative to the root of the classpath. + */ + private static Properties loadPropertiesFromClasspath(String path) { + try { + Properties kafkaProperties = new Properties(); + kafkaProperties.load(Class.class.getResourceAsStream(path)); + return kafkaProperties; + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + + /** + * Will determine the available port used by Kafka/Zookeeper servers. + */ + private int availablePort() { + ServerSocket s = null; + try { + s = new ServerSocket(0); + s.setReuseAddress(true); + return s.getLocalPort(); + } catch (Exception e) { + throw new IllegalStateException("Failed to discover available port.", e); + } finally { + try { + s.close(); + } catch (IOException e) { + // ignore + } + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/resources/log4j.properties b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/resources/log4j.properties new file mode 100644 index 0000000..57cd63f --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/resources/log4j.properties @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +log4j.rootCategory=INFO, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %t %c{2}:%L - %m%n + +#og4j.category.org.apache.nifi.processors.kafka=DEBUG
