Repository: nifi Updated Branches: refs/heads/master a68f87f96 -> f7ecb47e2
http://git-wip-us.apache.org/repos/asf/nifi/blob/e0e00ff2/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java new file mode 100644 index 0000000..92a6307 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.nifi.processors.kafka.test.EmbeddedKafka; +import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import kafka.consumer.Consumer; +import kafka.consumer.ConsumerConfig; +import kafka.consumer.ConsumerIterator; +import kafka.consumer.ConsumerTimeoutException; +import kafka.consumer.KafkaStream; +import kafka.javaapi.consumer.ConsumerConnector; + +public class KafkaPublisherTest { + + private static final String sampleData = "The true sign of intelligence is not knowledge but imagination.\n" + + "It's not that I'm so smart, it's just that I stay with problems longer.\n" + + "The only source of knowledge is experience.\n" + + "Only two things are infinite, the universe and human stupidity, and I'm not sure about the former.\n"; + + private static final String sampleData2 = "foo|bar|baz"; + + private static EmbeddedKafka kafkaLocal; + + private static EmbeddedKafkaProducerHelper producerHelper; + + @BeforeClass + public static void bforeClass() { + kafkaLocal = new EmbeddedKafka(); + kafkaLocal.start(); + producerHelper = new EmbeddedKafkaProducerHelper(kafkaLocal); + } + + @AfterClass + public static void afterClass() throws Exception { + producerHelper.close(); + kafkaLocal.stop(); + } + + String test = "Khalid El Bakraoui rented an apartment in Brussels that was raided last week and both are suspected of having ties to " + + "the terror attacks in Paris in November, the source said. While Belgian officials say both brothers were suicide bombers, a U.S. " + + "official briefed earlier on preliminary evidence from the investigation says authorities are looking at the possibility that one of " + + "the airport explosions may have been caused by a bomb inside a suitcase and the other was a suicide bombing. But identifying the brothers " + + "should help spring the investigation forward, says Cedric Leighton, a CNN military analyst and the former deputy director for the Joint Chiefs of Staff."; + + @Test + public void validateSuccessfulSendAsWhole() throws Exception { + InputStream fis = new ByteArrayInputStream(sampleData.getBytes(StandardCharsets.UTF_8)); + String topicName = "validateSuccessfulSendAsWhole"; + + Properties kafkaProperties = this.buildProducerProperties(); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + + SplittableMessageContext messageContext = new SplittableMessageContext(topicName, null, null); + + publisher.publish(messageContext, fis, null); + + fis.close(); + publisher.close(); + + ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName); + assertNotNull(iter.next()); + try { + iter.next(); + } catch (ConsumerTimeoutException e) { + // that's OK since this is the Kafka mechanism to unblock + } + } + + @Test + public void validateSuccessfulSendAsDelimited() throws Exception { + InputStream fis = new ByteArrayInputStream(sampleData.getBytes(StandardCharsets.UTF_8)); + String topicName = "validateSuccessfulSendAsDelimited"; + + Properties kafkaProperties = this.buildProducerProperties(); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + + SplittableMessageContext messageContext = new SplittableMessageContext(topicName, null, "\n"); + + publisher.publish(messageContext, fis, null); + publisher.close(); + + ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName); + assertNotNull(iter.next()); + assertNotNull(iter.next()); + assertNotNull(iter.next()); + assertNotNull(iter.next()); + try { + iter.next(); + fail(); + } catch (ConsumerTimeoutException e) { + // that's OK since this is the Kafka mechanism to unblock + } + } + + @Test + public void validateSuccessfulSendAsDelimited2() throws Exception { + InputStream fis = new ByteArrayInputStream(sampleData2.getBytes(StandardCharsets.UTF_8)); + String topicName = "validateSuccessfulSendAsDelimited2"; + + Properties kafkaProperties = this.buildProducerProperties(); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + + SplittableMessageContext messageContext = new SplittableMessageContext(topicName, null, "|"); + + publisher.publish(messageContext, fis, null); + publisher.close(); + + ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName); + assertNotNull(iter.next()); + assertNotNull(iter.next()); + assertNotNull(iter.next()); + try { + iter.next(); + fail(); + } catch (ConsumerTimeoutException e) { + // that's OK since this is the Kafka mechanism to unblock + } + } + + @Test + public void validateSuccessfulReSendOfFailedSegments() throws Exception { + InputStream fis = new ByteArrayInputStream(sampleData.getBytes(StandardCharsets.UTF_8)); + String topicName = "validateSuccessfulReSendOfFailedSegments"; + + Properties kafkaProperties = this.buildProducerProperties(); + + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + + SplittableMessageContext messageContext = new SplittableMessageContext(topicName, null, "\n"); + messageContext.setFailedSegments(1, 3); + + publisher.publish(messageContext, fis, null); + publisher.close(); + + ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName); + String m1 = new String(iter.next().message()); + String m2 = new String(iter.next().message()); + assertEquals("It's not that I'm so smart, it's just that I stay with problems longer.", m1); + assertEquals("Only two things are infinite, the universe and human stupidity, and I'm not sure about the former.", m2); + try { + iter.next(); + fail(); + } catch (ConsumerTimeoutException e) { + // that's OK since this is the Kafka mechanism to unblock + } + } + + private Properties buildProducerProperties() { + Properties kafkaProperties = new Properties(); + kafkaProperties.setProperty("bootstrap.servers", "0.0.0.0:" + kafkaLocal.getKafkaPort()); + kafkaProperties.setProperty("serializer.class", "kafka.serializer.DefaultEncoder"); + kafkaProperties.setProperty("acks", "1"); + kafkaProperties.put("auto.create.topics.enable", "true"); + kafkaProperties.setProperty("partitioner.class", "org.apache.nifi.processors.kafka.Partitioners$RoundRobinPartitioner"); + kafkaProperties.setProperty("timeout.ms", "5000"); + return kafkaProperties; + } + + private ConsumerIterator<byte[], byte[]> buildConsumer(String topic) { + Properties props = new Properties(); + props.put("zookeeper.connect", "localhost:" + kafkaLocal.getZookeeperPort()); + props.put("group.id", "test"); + props.put("consumer.timeout.ms", "5000"); + props.put("auto.offset.reset", "smallest"); + ConsumerConfig consumerConfig = new ConsumerConfig(props); + ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig); + Map<String, Integer> topicCountMap = new HashMap<>(1); + topicCountMap.put(topic, 1); + Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); + List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); + ConsumerIterator<byte[], byte[]> iter = streams.get(0).iterator(); + return iter; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/e0e00ff2/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java index 2f5da5c..3ed0549 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java @@ -17,462 +17,189 @@ package org.apache.nifi.processors.kafka; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; +import java.util.BitSet; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.Future; +import java.util.Properties; -import org.apache.kafka.clients.producer.BufferExhaustedException; -import org.apache.kafka.clients.producer.Callback; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.Metric; -import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.Node; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.nifi.provenance.ProvenanceEventRecord; -import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.processors.kafka.test.EmbeddedKafka; +import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; -import org.junit.Assert; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; -import kafka.common.FailedToSendMessageException; +import kafka.consumer.Consumer; +import kafka.consumer.ConsumerConfig; +import kafka.consumer.ConsumerIterator; +import kafka.consumer.KafkaStream; +import kafka.javaapi.consumer.ConsumerConnector; public class TestPutKafka { - @Test - public void testMultipleKeyValuePerFlowFile() { - final TestableProcessor proc = new TestableProcessor(); - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(PutKafka.TOPIC, "topic1"); - runner.setProperty(PutKafka.KEY, "key1"); - runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); - runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); - - runner.enqueue("Hello World\nGoodbye\n1\n2\n3\n4\n5\n6\n7\n8\n9".getBytes()); - runner.run(2); // we have to run twice because the first iteration will result in data being added to a queue in the processor; the second onTrigger call will transfer FlowFiles. - - runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); - - final List<ProducerRecord<byte[], byte[]>> messages = ((MockProducer) proc.getProducer()).getMessages(); - assertEquals(11, messages.size()); - - assertTrue(Arrays.equals("Hello World".getBytes(StandardCharsets.UTF_8), messages.get(0).value())); - assertTrue(Arrays.equals("Goodbye".getBytes(StandardCharsets.UTF_8), messages.get(1).value())); - - for (int i = 1; i <= 9; i++) { - assertTrue(Arrays.equals(String.valueOf(i).getBytes(StandardCharsets.UTF_8), messages.get(i + 1).value())); - } - } - - @Test - public void testWithImmediateFailure() { - final TestableProcessor proc = new TestableProcessor(0); - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(PutKafka.TOPIC, "topic1"); - runner.setProperty(PutKafka.KEY, "key1"); - runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); - runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); - - final String text = "Hello World\nGoodbye\n1\n2\n3\n4\n5\n6\n7\n8\n9"; - runner.enqueue(text.getBytes()); - runner.run(2); - - runner.assertAllFlowFilesTransferred(PutKafka.REL_FAILURE, 1); - final MockFlowFile mff = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0); - mff.assertContentEquals(text); - } - - @Test - public void testPartialFailure() { - final TestableProcessor proc = new TestableProcessor(2); // fail after sending 2 messages. - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(PutKafka.TOPIC, "topic1"); - runner.setProperty(PutKafka.KEY, "key1"); - runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); - runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); - runner.setProperty(PutKafka.MAX_BUFFER_SIZE, "1 B"); - - final byte[] bytes = "1\n2\n3\n4".getBytes(); - runner.enqueue(bytes); - runner.run(2); - - runner.assertTransferCount(PutKafka.REL_SUCCESS, 1); - runner.assertTransferCount(PutKafka.REL_FAILURE, 1); - - final MockFlowFile successFF = runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0); - successFF.assertContentEquals("1\n2\n"); - - final MockFlowFile failureFF = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0); - failureFF.assertContentEquals("3\n4"); - } - - @Test - public void testPartialFailureWithSuccessBeforeAndAfter() { - final TestableProcessor proc = new TestableProcessor(2, 4); // fail after sending 2 messages, then stop failing after 4 - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(PutKafka.TOPIC, "topic1"); - runner.setProperty(PutKafka.KEY, "key1"); - runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); - runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); - runner.setProperty(PutKafka.MAX_BUFFER_SIZE, "1 B"); - - final byte[] bytes = "1\n2\n3\n4\n5\n6".getBytes(); - runner.enqueue(bytes); - runner.run(2); - - runner.assertTransferCount(PutKafka.REL_SUCCESS, 2); - runner.assertTransferCount(PutKafka.REL_FAILURE, 1); - - final List<MockFlowFile> success = runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS); - for (final MockFlowFile successFF : success) { - if ('1' == successFF.toByteArray()[0]) { - successFF.assertContentEquals("1\n2\n"); - } else if ('5' == successFF.toByteArray()[0]) { - successFF.assertContentEquals("5\n6"); - } else { - Assert.fail("Wrong content for FlowFile; contained " + new String(successFF.toByteArray())); - } - } - - final MockFlowFile failureFF = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0); - failureFF.assertContentEquals("3\n4\n"); - } + private static EmbeddedKafka kafkaLocal; + private static EmbeddedKafkaProducerHelper producerHelper; - @Test - public void testWithEmptyMessages() { - final TestableProcessor proc = new TestableProcessor(); - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(PutKafka.TOPIC, "topic1"); - runner.setProperty(PutKafka.KEY, "key1"); - runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); - runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); - - final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes(); - runner.enqueue(bytes); - runner.run(2); - - runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); - - final List<ProducerRecord<byte[], byte[]>> msgs = ((MockProducer) proc.getProducer()).getMessages(); - assertEquals(4, msgs.size()); - - for (int i = 1; i <= 4; i++) { - assertTrue(Arrays.equals(String.valueOf(i).getBytes(), msgs.get(i - 1).value())); - } + @BeforeClass + public static void bforeClass() { + kafkaLocal = new EmbeddedKafka(); + kafkaLocal.start(); + producerHelper = new EmbeddedKafkaProducerHelper(kafkaLocal); } - @Test - public void testProvenanceReporterMessagesCount() { - final TestableProcessor processor = new TestableProcessor(); - - final TestRunner runner = TestRunners.newTestRunner(processor); - - runner.setProperty(PutKafka.TOPIC, "topic1"); - runner.setProperty(PutKafka.KEY, "key1"); - runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); - runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); - - final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes(); - runner.enqueue(bytes); - runner.run(2); - - final List<ProvenanceEventRecord> events = runner.getProvenanceEvents(); - assertEquals(1, events.size()); - final ProvenanceEventRecord event = events.get(0); - assertEquals(ProvenanceEventType.SEND, event.getEventType()); - assertEquals("kafka://localhost:1111/topics/topic1", event.getTransitUri()); - assertTrue(event.getDetails().startsWith("Sent 4 messages")); - } - - @Test - public void testProvenanceReporterWithoutDelimiterMessagesCount() { - final TestableProcessor processor = new TestableProcessor(); - - final TestRunner runner = TestRunners.newTestRunner(processor); - runner.setProperty(PutKafka.TOPIC, "topic1"); - runner.setProperty(PutKafka.KEY, "key1"); - runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); - - final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes(); - runner.enqueue(bytes); - runner.run(2); - - final List<ProvenanceEventRecord> events = runner.getProvenanceEvents(); - assertEquals(1, events.size()); - final ProvenanceEventRecord event = events.get(0); - assertEquals(ProvenanceEventType.SEND, event.getEventType()); - assertEquals("kafka://localhost:1111/topics/topic1", event.getTransitUri()); - } - - @Test - public void testRoundRobinAcrossMultipleMessages() { - final TestableProcessor proc = new TestableProcessor(); - - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(PutKafka.TOPIC, "topic1"); - runner.setProperty(PutKafka.KEY, "key1"); - runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); - runner.setProperty(PutKafka.PARTITION_STRATEGY, PutKafka.ROUND_ROBIN_PARTITIONING); - - runner.enqueue("hello".getBytes()); - runner.enqueue("there".getBytes()); - runner.enqueue("how are you".getBytes()); - runner.enqueue("today".getBytes()); - - runner.run(5); - - runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 4); - - final List<ProducerRecord<byte[], byte[]>> records = ((MockProducer) proc.getProducer()).getMessages(); - for (int i = 0; i < 3; i++) { - assertEquals(i + 1, records.get(i).partition().intValue()); - } - - assertEquals(1, records.get(3).partition().intValue()); + @AfterClass + public static void afterClass() throws Exception { + producerHelper.close(); + kafkaLocal.stop(); } @Test - public void testRoundRobinAcrossMultipleMessagesInSameFlowFile() { - final TestableProcessor proc = new TestableProcessor(); - - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(PutKafka.TOPIC, "topic1"); + public void testDelimitedMessagesWithKey() { + String topicName = "testDelimitedMessagesWithKey"; + PutKafka putKafka = new PutKafka(); + TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setProperty(PutKafka.TOPIC, topicName); + runner.setProperty(PutKafka.CLIENT_NAME, "foo"); runner.setProperty(PutKafka.KEY, "key1"); - runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); - runner.setProperty(PutKafka.PARTITION_STRATEGY, PutKafka.ROUND_ROBIN_PARTITIONING); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort()); runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n"); - runner.enqueue("hello\nthere\nhow are you\ntoday".getBytes()); - - runner.run(2); + runner.enqueue("Hello World\nGoodbye\n1\n2\n3\n4\n5".getBytes()); + runner.run(1, false); runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); - - final List<ProducerRecord<byte[], byte[]>> records = ((MockProducer) proc.getProducer()).getMessages(); - for (int i = 0; i < 3; i++) { - assertEquals(i + 1, records.get(i).partition().intValue()); - } - - assertEquals(1, records.get(3).partition().intValue()); + ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName); + assertEquals("Hello World", new String(consumer.next().message())); + assertEquals("Goodbye", new String(consumer.next().message())); + assertEquals("1", new String(consumer.next().message())); + assertEquals("2", new String(consumer.next().message())); + assertEquals("3", new String(consumer.next().message())); + assertEquals("4", new String(consumer.next().message())); + assertEquals("5", new String(consumer.next().message())); + + runner.shutdown(); } - @Test - public void testUserDefinedPartition() { - final TestableProcessor proc = new TestableProcessor(); - - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(PutKafka.TOPIC, "topic1"); + @Ignore + public void testWithFailureAndPartialResend() throws Exception { + String topicName = "testWithImmediateFailure"; + PutKafka putKafka = new PutKafka(); + final TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setProperty(PutKafka.TOPIC, topicName); + runner.setProperty(PutKafka.CLIENT_NAME, "foo"); runner.setProperty(PutKafka.KEY, "key1"); - runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); - runner.setProperty(PutKafka.PARTITION_STRATEGY, PutKafka.USER_DEFINED_PARTITIONING); - runner.setProperty(PutKafka.PARTITION, "${part}"); + runner.setProperty(PutKafka.SEED_BROKERS, "0.0.0.0:" + kafkaLocal.getKafkaPort()); runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n"); - final Map<String, String> attrs = new HashMap<>(); - attrs.put("part", "3"); - runner.enqueue("hello\nthere\nhow are you\ntoday".getBytes(), attrs); - - runner.run(2); - - runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); + final String text = "Hello World\nGoodbye\n1\n2"; + runner.enqueue(text.getBytes()); + afterClass(); // kill Kafka right before send to ensure producer fails + runner.run(1, false); - final List<ProducerRecord<byte[], byte[]>> records = ((MockProducer) proc.getProducer()).getMessages(); - for (int i = 0; i < 4; i++) { - assertEquals(3, records.get(i).partition().intValue()); + runner.assertAllFlowFilesTransferred(PutKafka.REL_FAILURE, 1); + MockFlowFile ff = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0); + String failedSegmentsStr = ff.getAttribute(PutKafka.ATTR_FAILED_SEGMENTS); + BitSet fs = BitSet.valueOf(failedSegmentsStr.getBytes()); + assertTrue(fs.get(0)); + assertTrue(fs.get(1)); + assertTrue(fs.get(2)); + assertTrue(fs.get(3)); + String delimiter = ff.getAttribute(PutKafka.ATTR_DELIMITER); + assertEquals("\n", delimiter); + String key = ff.getAttribute(PutKafka.ATTR_KEY); + assertEquals("key1", key); + String topic = ff.getAttribute(PutKafka.ATTR_TOPIC); + assertEquals(topicName, topic); + + bforeClass(); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort()); + Map<String, String> attr = new HashMap<>(ff.getAttributes()); + /* + * So here we are emulating partial success. Basically even though all 4 + * messages failed to be sent by changing the ATTR_FAILED_SEGMENTS value + * we essentially saying that only two failed and need to be resent. + */ + BitSet _fs = new BitSet(); + _fs.set(1); + _fs.set(3); + attr.put(PutKafka.ATTR_FAILED_SEGMENTS, new String(_fs.toByteArray(), StandardCharsets.UTF_8)); + ff.putAttributes(attr); + runner.enqueue(ff); + runner.run(1, false); + MockFlowFile sff = runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0); + assertNull(sff.getAttribute(PutKafka.ATTR_FAILED_SEGMENTS)); + assertNull(sff.getAttribute(PutKafka.ATTR_TOPIC)); + assertNull(sff.getAttribute(PutKafka.ATTR_KEY)); + assertNull(sff.getAttribute(PutKafka.ATTR_DELIMITER)); + + ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName); + + assertEquals("Goodbye", new String(consumer.next().message())); + assertEquals("2", new String(consumer.next().message())); + try { + consumer.next(); + fail(); + } catch (Exception e) { + // ignore } } - - @Test - public void testUserDefinedPartitionWithInvalidValue() { - final TestableProcessor proc = new TestableProcessor(); - - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(PutKafka.TOPIC, "topic1"); + public void testWithEmptyMessages() { + String topicName = "testWithEmptyMessages"; + PutKafka putKafka = new PutKafka(); + final TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setProperty(PutKafka.TOPIC, topicName); runner.setProperty(PutKafka.KEY, "key1"); - runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); - runner.setProperty(PutKafka.PARTITION_STRATEGY, PutKafka.USER_DEFINED_PARTITIONING); - runner.setProperty(PutKafka.PARTITION, "${part}"); + runner.setProperty(PutKafka.CLIENT_NAME, "foo"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort()); runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n"); - final Map<String, String> attrs = new HashMap<>(); - attrs.put("part", "bogus"); - runner.enqueue("hello\nthere\nhow are you\ntoday".getBytes(), attrs); - - runner.run(2); + final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes(); + runner.enqueue(bytes); + runner.run(1); runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); - final List<ProducerRecord<byte[], byte[]>> records = ((MockProducer) proc.getProducer()).getMessages(); - // should all be the same partition, regardless of what partition it is. - final int partition = records.get(0).partition().intValue(); - - for (int i = 0; i < 4; i++) { - assertEquals(partition, records.get(i).partition().intValue()); - } - } - - - @Test - public void testFullBuffer() { - final TestableProcessor proc = new TestableProcessor(); - - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(PutKafka.TOPIC, "topic1"); - runner.setProperty(PutKafka.KEY, "key1"); - runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); - runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n"); - runner.setProperty(PutKafka.MAX_BUFFER_SIZE, "5 B"); - proc.setMaxQueueSize(10L); // will take 4 bytes for key and 1 byte for value. - - runner.enqueue("1\n2\n3\n4\n".getBytes()); - runner.run(2); - - runner.assertTransferCount(PutKafka.REL_SUCCESS, 1); - runner.assertTransferCount(PutKafka.REL_FAILURE, 1); - - runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0).assertContentEquals("1\n2\n"); - runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0).assertContentEquals("3\n4\n"); - } - - - /** - * Used to override the {@link #getProducer()} method so that we can enforce that our MockProducer is used - */ - private static class TestableProcessor extends PutKafka { - private final MockProducer producer; - - public TestableProcessor() { - this(null); - } - - public TestableProcessor(final Integer failAfter) { - this(failAfter, null); - } - - public TestableProcessor(final Integer failAfter, final Integer stopFailingAfter) { - producer = new MockProducer(); - producer.setFailAfter(failAfter); - producer.setStopFailingAfter(stopFailingAfter); - } - - @Override - protected Producer<byte[], byte[]> getProducer() { - return producer; - } - - public void setMaxQueueSize(final long bytes) { - producer.setMaxQueueSize(bytes); + ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName); + assertNotNull(consumer.next()); + assertNotNull(consumer.next()); + assertNotNull(consumer.next()); + assertNotNull(consumer.next()); + try { + consumer.next(); + fail(); + } catch (Exception e) { + // ignore } } - - /** - * We have our own Mock Producer, which is very similar to the Kafka-supplied one. However, with the Kafka-supplied - * Producer, we don't have the ability to tell it to fail after X number of messages; rather, we can only tell it - * to fail on the next message. Since we are sending multiple messages in a single onTrigger call for the Processor, - * this doesn't allow us to test failure conditions adequately. - */ - private static class MockProducer implements Producer<byte[], byte[]> { - - private int sendCount = 0; - private Integer failAfter; - private Integer stopFailingAfter; - private long queueSize = 0L; - private long maxQueueSize = Long.MAX_VALUE; - - private final List<ProducerRecord<byte[], byte[]>> messages = new ArrayList<>(); - - public MockProducer() { - } - - public void setMaxQueueSize(final long bytes) { - this.maxQueueSize = bytes; - } - - public List<ProducerRecord<byte[], byte[]>> getMessages() { - return messages; - } - - public void setFailAfter(final Integer successCount) { - failAfter = successCount; - } - - public void setStopFailingAfter(final Integer stopFailingAfter) { - this.stopFailingAfter = stopFailingAfter; - } - - @Override - public Future<RecordMetadata> send(final ProducerRecord<byte[], byte[]> record) { - return send(record, null); - } - - @Override - public Future<RecordMetadata> send(final ProducerRecord<byte[], byte[]> record, final Callback callback) { - sendCount++; - - final ByteArraySerializer serializer = new ByteArraySerializer(); - final int keyBytes = serializer.serialize(record.topic(), record.key()).length; - final int valueBytes = serializer.serialize(record.topic(), record.value()).length; - if (maxQueueSize - queueSize < keyBytes + valueBytes) { - throw new BufferExhaustedException("Queue size is " + queueSize + " but serialized message is " + (keyBytes + valueBytes)); - } - - queueSize += keyBytes + valueBytes; - - if (failAfter != null && sendCount > failAfter && ((stopFailingAfter == null) || (sendCount < stopFailingAfter + 1))) { - final Exception e = new FailedToSendMessageException("Failed to send message", new RuntimeException("Unit test told to fail after " + failAfter + " successful messages")); - callback.onCompletion(null, e); - } else { - messages.add(record); - final RecordMetadata meta = new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 1 : record.partition()), 0L, 0L); - callback.onCompletion(meta, null); - } - - // we don't actually look at the Future in the processor, so we can just return null - return null; - } - - @Override - public List<PartitionInfo> partitionsFor(String topic) { - final Node leader = new Node(1, "localhost", 1111); - final Node node2 = new Node(2, "localhost-2", 2222); - final Node node3 = new Node(3, "localhost-3", 3333); - - final PartitionInfo partInfo1 = new PartitionInfo(topic, 1, leader, new Node[] {node2, node3}, new Node[0]); - final PartitionInfo partInfo2 = new PartitionInfo(topic, 2, leader, new Node[] {node2, node3}, new Node[0]); - final PartitionInfo partInfo3 = new PartitionInfo(topic, 3, leader, new Node[] {node2, node3}, new Node[0]); - - final List<PartitionInfo> infos = new ArrayList<>(3); - infos.add(partInfo1); - infos.add(partInfo2); - infos.add(partInfo3); - return infos; - } - - @Override - public Map<MetricName, ? extends Metric> metrics() { - return Collections.emptyMap(); - } - - @Override - public void close() { - } + private ConsumerIterator<byte[], byte[]> buildConsumer(String topic) { + Properties props = new Properties(); + props.put("zookeeper.connect", "0.0.0.0:" + kafkaLocal.getZookeeperPort()); + props.put("group.id", "test"); + props.put("consumer.timeout.ms", "5000"); + props.put("auto.offset.reset", "smallest"); + ConsumerConfig consumerConfig = new ConsumerConfig(props); + ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig); + Map<String, Integer> topicCountMap = new HashMap<>(1); + topicCountMap.put(topic, 1); + Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); + List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); + ConsumerIterator<byte[], byte[]> iter = streams.get(0).iterator(); + return iter; } } http://git-wip-us.apache.org/repos/asf/nifi/blob/e0e00ff2/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/resources/log4j.properties b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/resources/log4j.properties index 35778d8..8e37bb9 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/resources/log4j.properties +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/resources/log4j.properties @@ -12,12 +12,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -log4j.rootCategory=WARN, stdout +log4j.rootCategory=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %t %c{2}:%L - %m%n -log4j.category.org.apache.nifi.processors.kafka=INFO -log4j.category.kafka=ERROR -#log4j.category.org.apache.nifi.startup=INFO +log4j.category.org.apache.nifi.processors.kafka=DEBUG
