Repository: nifi
Updated Branches:
  refs/heads/master a68f87f96 -> f7ecb47e2


http://git-wip-us.apache.org/repos/asf/nifi/blob/e0e00ff2/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java
new file mode 100644
index 0000000..92a6307
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.nifi.processors.kafka.test.EmbeddedKafka;
+import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.ConsumerTimeoutException;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+
+public class KafkaPublisherTest {
+
+    private static final String sampleData = "The true sign of intelligence is 
not knowledge but imagination.\n"
+            + "It's not that I'm so smart, it's just that I stay with problems 
longer.\n"
+            + "The only source of knowledge is experience.\n"
+            + "Only two things are infinite, the universe and human stupidity, 
and I'm not sure about the former.\n";
+
+    private static final String sampleData2 = "foo|bar|baz";
+
+    private static EmbeddedKafka kafkaLocal;
+
+    private static EmbeddedKafkaProducerHelper producerHelper;
+
+    @BeforeClass
+    public static void bforeClass() {
+        kafkaLocal = new EmbeddedKafka();
+        kafkaLocal.start();
+        producerHelper = new EmbeddedKafkaProducerHelper(kafkaLocal);
+    }
+
+    @AfterClass
+    public static void afterClass() throws Exception {
+        producerHelper.close();
+        kafkaLocal.stop();
+    }
+
+    String test = "Khalid El Bakraoui rented an apartment in Brussels that was 
raided last week and both are suspected of having ties to "
+            + "the terror attacks in Paris in November, the source said. While 
Belgian officials say both brothers were suicide bombers, a U.S. "
+            + "official briefed earlier on preliminary evidence from the 
investigation says authorities are looking at the possibility that one of "
+            + "the airport explosions may have been caused by a bomb inside a 
suitcase and the other was a suicide bombing. But identifying the brothers "
+            + "should help spring the investigation forward, says Cedric 
Leighton, a CNN military analyst and the former deputy director for the Joint 
Chiefs of Staff.";
+
+    @Test
+    public void validateSuccessfulSendAsWhole() throws Exception {
+        InputStream fis = new 
ByteArrayInputStream(sampleData.getBytes(StandardCharsets.UTF_8));
+        String topicName = "validateSuccessfulSendAsWhole";
+
+        Properties kafkaProperties = this.buildProducerProperties();
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
+
+        SplittableMessageContext messageContext = new 
SplittableMessageContext(topicName, null, null);
+
+        publisher.publish(messageContext, fis, null);
+
+        fis.close();
+        publisher.close();
+
+        ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
+        assertNotNull(iter.next());
+        try {
+            iter.next();
+        } catch (ConsumerTimeoutException e) {
+            // that's OK since this is the Kafka mechanism to unblock
+        }
+    }
+
+    @Test
+    public void validateSuccessfulSendAsDelimited() throws Exception {
+        InputStream fis = new 
ByteArrayInputStream(sampleData.getBytes(StandardCharsets.UTF_8));
+        String topicName = "validateSuccessfulSendAsDelimited";
+
+        Properties kafkaProperties = this.buildProducerProperties();
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
+
+        SplittableMessageContext messageContext = new 
SplittableMessageContext(topicName, null, "\n");
+
+        publisher.publish(messageContext, fis, null);
+        publisher.close();
+
+        ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
+        assertNotNull(iter.next());
+        assertNotNull(iter.next());
+        assertNotNull(iter.next());
+        assertNotNull(iter.next());
+        try {
+            iter.next();
+            fail();
+        } catch (ConsumerTimeoutException e) {
+            // that's OK since this is the Kafka mechanism to unblock
+        }
+    }
+
+    @Test
+    public void validateSuccessfulSendAsDelimited2() throws Exception {
+        InputStream fis = new 
ByteArrayInputStream(sampleData2.getBytes(StandardCharsets.UTF_8));
+        String topicName = "validateSuccessfulSendAsDelimited2";
+
+        Properties kafkaProperties = this.buildProducerProperties();
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
+
+        SplittableMessageContext messageContext = new 
SplittableMessageContext(topicName, null, "|");
+
+        publisher.publish(messageContext, fis, null);
+        publisher.close();
+
+        ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
+        assertNotNull(iter.next());
+        assertNotNull(iter.next());
+        assertNotNull(iter.next());
+        try {
+            iter.next();
+            fail();
+        } catch (ConsumerTimeoutException e) {
+            // that's OK since this is the Kafka mechanism to unblock
+        }
+    }
+
+    @Test
+    public void validateSuccessfulReSendOfFailedSegments() throws Exception {
+        InputStream fis = new 
ByteArrayInputStream(sampleData.getBytes(StandardCharsets.UTF_8));
+        String topicName = "validateSuccessfulReSendOfFailedSegments";
+
+        Properties kafkaProperties = this.buildProducerProperties();
+
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
+
+        SplittableMessageContext messageContext = new 
SplittableMessageContext(topicName, null, "\n");
+        messageContext.setFailedSegments(1, 3);
+
+        publisher.publish(messageContext, fis, null);
+        publisher.close();
+
+        ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
+        String m1 = new String(iter.next().message());
+        String m2 = new String(iter.next().message());
+        assertEquals("It's not that I'm so smart, it's just that I stay with 
problems longer.", m1);
+        assertEquals("Only two things are infinite, the universe and human 
stupidity, and I'm not sure about the former.", m2);
+        try {
+            iter.next();
+            fail();
+        } catch (ConsumerTimeoutException e) {
+            // that's OK since this is the Kafka mechanism to unblock
+        }
+    }
+
+    private Properties buildProducerProperties() {
+        Properties kafkaProperties = new Properties();
+        kafkaProperties.setProperty("bootstrap.servers", "0.0.0.0:" + 
kafkaLocal.getKafkaPort());
+        kafkaProperties.setProperty("serializer.class", 
"kafka.serializer.DefaultEncoder");
+        kafkaProperties.setProperty("acks", "1");
+        kafkaProperties.put("auto.create.topics.enable", "true");
+        kafkaProperties.setProperty("partitioner.class", 
"org.apache.nifi.processors.kafka.Partitioners$RoundRobinPartitioner");
+        kafkaProperties.setProperty("timeout.ms", "5000");
+        return kafkaProperties;
+    }
+
+    private ConsumerIterator<byte[], byte[]> buildConsumer(String topic) {
+        Properties props = new Properties();
+        props.put("zookeeper.connect", "localhost:" + 
kafkaLocal.getZookeeperPort());
+        props.put("group.id", "test");
+        props.put("consumer.timeout.ms", "5000");
+        props.put("auto.offset.reset", "smallest");
+        ConsumerConfig consumerConfig = new ConsumerConfig(props);
+        ConsumerConnector consumer = 
Consumer.createJavaConsumerConnector(consumerConfig);
+        Map<String, Integer> topicCountMap = new HashMap<>(1);
+        topicCountMap.put(topic, 1);
+        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
consumer.createMessageStreams(topicCountMap);
+        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
+        ConsumerIterator<byte[], byte[]> iter = streams.get(0).iterator();
+        return iter;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e0e00ff2/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
index 2f5da5c..3ed0549 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
@@ -17,462 +17,189 @@
 package org.apache.nifi.processors.kafka;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
+import java.util.BitSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Future;
+import java.util.Properties;
 
-import org.apache.kafka.clients.producer.BufferExhaustedException;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.nifi.provenance.ProvenanceEventRecord;
-import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.processors.kafka.test.EmbeddedKafka;
+import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.junit.Assert;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 
-import kafka.common.FailedToSendMessageException;
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
 
 
 public class TestPutKafka {
 
-    @Test
-    public void testMultipleKeyValuePerFlowFile() {
-        final TestableProcessor proc = new TestableProcessor();
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(PutKafka.TOPIC, "topic1");
-        runner.setProperty(PutKafka.KEY, "key1");
-        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
-        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
-
-        runner.enqueue("Hello 
World\nGoodbye\n1\n2\n3\n4\n5\n6\n7\n8\n9".getBytes());
-        runner.run(2); // we have to run twice because the first iteration 
will result in data being added to a queue in the processor; the second 
onTrigger call will transfer FlowFiles.
-
-        runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
-
-        final List<ProducerRecord<byte[], byte[]>> messages = ((MockProducer) 
proc.getProducer()).getMessages();
-        assertEquals(11, messages.size());
-
-        assertTrue(Arrays.equals("Hello 
World".getBytes(StandardCharsets.UTF_8), messages.get(0).value()));
-        assertTrue(Arrays.equals("Goodbye".getBytes(StandardCharsets.UTF_8), 
messages.get(1).value()));
-
-        for (int i = 1; i <= 9; i++) {
-            
assertTrue(Arrays.equals(String.valueOf(i).getBytes(StandardCharsets.UTF_8), 
messages.get(i + 1).value()));
-        }
-    }
-
-    @Test
-    public void testWithImmediateFailure() {
-        final TestableProcessor proc = new TestableProcessor(0);
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(PutKafka.TOPIC, "topic1");
-        runner.setProperty(PutKafka.KEY, "key1");
-        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
-        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
-
-        final String text = "Hello World\nGoodbye\n1\n2\n3\n4\n5\n6\n7\n8\n9";
-        runner.enqueue(text.getBytes());
-        runner.run(2);
-
-        runner.assertAllFlowFilesTransferred(PutKafka.REL_FAILURE, 1);
-        final MockFlowFile mff = 
runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0);
-        mff.assertContentEquals(text);
-    }
-
-    @Test
-    public void testPartialFailure() {
-        final TestableProcessor proc = new TestableProcessor(2); // fail after 
sending 2 messages.
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(PutKafka.TOPIC, "topic1");
-        runner.setProperty(PutKafka.KEY, "key1");
-        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
-        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
-        runner.setProperty(PutKafka.MAX_BUFFER_SIZE, "1 B");
-
-        final byte[] bytes = "1\n2\n3\n4".getBytes();
-        runner.enqueue(bytes);
-        runner.run(2);
-
-        runner.assertTransferCount(PutKafka.REL_SUCCESS, 1);
-        runner.assertTransferCount(PutKafka.REL_FAILURE, 1);
-
-        final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0);
-        successFF.assertContentEquals("1\n2\n");
-
-        final MockFlowFile failureFF = 
runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0);
-        failureFF.assertContentEquals("3\n4");
-    }
-
-    @Test
-    public void testPartialFailureWithSuccessBeforeAndAfter() {
-        final TestableProcessor proc = new TestableProcessor(2, 4); // fail 
after sending 2 messages, then stop failing after 4
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(PutKafka.TOPIC, "topic1");
-        runner.setProperty(PutKafka.KEY, "key1");
-        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
-        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
-        runner.setProperty(PutKafka.MAX_BUFFER_SIZE, "1 B");
-
-        final byte[] bytes = "1\n2\n3\n4\n5\n6".getBytes();
-        runner.enqueue(bytes);
-        runner.run(2);
-
-        runner.assertTransferCount(PutKafka.REL_SUCCESS, 2);
-        runner.assertTransferCount(PutKafka.REL_FAILURE, 1);
-
-        final List<MockFlowFile> success = 
runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS);
-        for (final MockFlowFile successFF : success) {
-            if ('1' == successFF.toByteArray()[0]) {
-                successFF.assertContentEquals("1\n2\n");
-            } else if ('5' == successFF.toByteArray()[0]) {
-                successFF.assertContentEquals("5\n6");
-            } else {
-                Assert.fail("Wrong content for FlowFile; contained " + new 
String(successFF.toByteArray()));
-            }
-        }
-
-        final MockFlowFile failureFF = 
runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0);
-        failureFF.assertContentEquals("3\n4\n");
-    }
+    private static EmbeddedKafka kafkaLocal;
 
+    private static EmbeddedKafkaProducerHelper producerHelper;
 
-    @Test
-    public void testWithEmptyMessages() {
-        final TestableProcessor proc = new TestableProcessor();
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(PutKafka.TOPIC, "topic1");
-        runner.setProperty(PutKafka.KEY, "key1");
-        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
-        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
-
-        final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes();
-        runner.enqueue(bytes);
-        runner.run(2);
-
-        runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
-
-        final List<ProducerRecord<byte[], byte[]>> msgs = ((MockProducer) 
proc.getProducer()).getMessages();
-        assertEquals(4, msgs.size());
-
-        for (int i = 1; i <= 4; i++) {
-            assertTrue(Arrays.equals(String.valueOf(i).getBytes(), msgs.get(i 
- 1).value()));
-        }
+    @BeforeClass
+    public static void bforeClass() {
+        kafkaLocal = new EmbeddedKafka();
+        kafkaLocal.start();
+        producerHelper = new EmbeddedKafkaProducerHelper(kafkaLocal);
     }
 
-    @Test
-    public void testProvenanceReporterMessagesCount() {
-        final TestableProcessor processor = new TestableProcessor();
-
-        final TestRunner runner = TestRunners.newTestRunner(processor);
-
-        runner.setProperty(PutKafka.TOPIC, "topic1");
-        runner.setProperty(PutKafka.KEY, "key1");
-        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
-        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
-
-        final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes();
-        runner.enqueue(bytes);
-        runner.run(2);
-
-        final List<ProvenanceEventRecord> events = 
runner.getProvenanceEvents();
-        assertEquals(1, events.size());
-        final ProvenanceEventRecord event = events.get(0);
-        assertEquals(ProvenanceEventType.SEND, event.getEventType());
-        assertEquals("kafka://localhost:1111/topics/topic1", 
event.getTransitUri());
-        assertTrue(event.getDetails().startsWith("Sent 4 messages"));
-    }
-
-    @Test
-    public void testProvenanceReporterWithoutDelimiterMessagesCount() {
-        final TestableProcessor processor = new TestableProcessor();
-
-        final TestRunner runner = TestRunners.newTestRunner(processor);
-        runner.setProperty(PutKafka.TOPIC, "topic1");
-        runner.setProperty(PutKafka.KEY, "key1");
-        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
-
-        final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes();
-        runner.enqueue(bytes);
-        runner.run(2);
-
-        final List<ProvenanceEventRecord> events = 
runner.getProvenanceEvents();
-        assertEquals(1, events.size());
-        final ProvenanceEventRecord event = events.get(0);
-        assertEquals(ProvenanceEventType.SEND, event.getEventType());
-        assertEquals("kafka://localhost:1111/topics/topic1", 
event.getTransitUri());
-    }
-
-    @Test
-    public void testRoundRobinAcrossMultipleMessages() {
-        final TestableProcessor proc = new TestableProcessor();
-
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(PutKafka.TOPIC, "topic1");
-        runner.setProperty(PutKafka.KEY, "key1");
-        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
-        runner.setProperty(PutKafka.PARTITION_STRATEGY, 
PutKafka.ROUND_ROBIN_PARTITIONING);
-
-        runner.enqueue("hello".getBytes());
-        runner.enqueue("there".getBytes());
-        runner.enqueue("how are you".getBytes());
-        runner.enqueue("today".getBytes());
-
-        runner.run(5);
-
-        runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 4);
-
-        final List<ProducerRecord<byte[], byte[]>> records = ((MockProducer) 
proc.getProducer()).getMessages();
-        for (int i = 0; i < 3; i++) {
-            assertEquals(i + 1, records.get(i).partition().intValue());
-        }
-
-        assertEquals(1, records.get(3).partition().intValue());
+    @AfterClass
+    public static void afterClass() throws Exception {
+        producerHelper.close();
+        kafkaLocal.stop();
     }
 
     @Test
-    public void testRoundRobinAcrossMultipleMessagesInSameFlowFile() {
-        final TestableProcessor proc = new TestableProcessor();
-
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(PutKafka.TOPIC, "topic1");
+    public void testDelimitedMessagesWithKey() {
+        String topicName = "testDelimitedMessagesWithKey";
+        PutKafka putKafka = new PutKafka();
+        TestRunner runner = TestRunners.newTestRunner(putKafka);
+        runner.setProperty(PutKafka.TOPIC, topicName);
+        runner.setProperty(PutKafka.CLIENT_NAME, "foo");
         runner.setProperty(PutKafka.KEY, "key1");
-        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
-        runner.setProperty(PutKafka.PARTITION_STRATEGY, 
PutKafka.ROUND_ROBIN_PARTITIONING);
+        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + 
kafkaLocal.getKafkaPort());
         runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
 
-        runner.enqueue("hello\nthere\nhow are you\ntoday".getBytes());
-
-        runner.run(2);
+        runner.enqueue("Hello World\nGoodbye\n1\n2\n3\n4\n5".getBytes());
+        runner.run(1, false);
 
         runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
-
-        final List<ProducerRecord<byte[], byte[]>> records = ((MockProducer) 
proc.getProducer()).getMessages();
-        for (int i = 0; i < 3; i++) {
-            assertEquals(i + 1, records.get(i).partition().intValue());
-        }
-
-        assertEquals(1, records.get(3).partition().intValue());
+        ConsumerIterator<byte[], byte[]> consumer = 
this.buildConsumer(topicName);
+        assertEquals("Hello World", new String(consumer.next().message()));
+        assertEquals("Goodbye", new String(consumer.next().message()));
+        assertEquals("1", new String(consumer.next().message()));
+        assertEquals("2", new String(consumer.next().message()));
+        assertEquals("3", new String(consumer.next().message()));
+        assertEquals("4", new String(consumer.next().message()));
+        assertEquals("5", new String(consumer.next().message()));
+
+        runner.shutdown();
     }
 
-
     @Test
-    public void testUserDefinedPartition() {
-        final TestableProcessor proc = new TestableProcessor();
-
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(PutKafka.TOPIC, "topic1");
+    @Ignore
+    public void testWithFailureAndPartialResend() throws Exception {
+        String topicName = "testWithImmediateFailure";
+        PutKafka putKafka = new PutKafka();
+        final TestRunner runner = TestRunners.newTestRunner(putKafka);
+        runner.setProperty(PutKafka.TOPIC, topicName);
+        runner.setProperty(PutKafka.CLIENT_NAME, "foo");
         runner.setProperty(PutKafka.KEY, "key1");
-        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
-        runner.setProperty(PutKafka.PARTITION_STRATEGY, 
PutKafka.USER_DEFINED_PARTITIONING);
-        runner.setProperty(PutKafka.PARTITION, "${part}");
+        runner.setProperty(PutKafka.SEED_BROKERS, "0.0.0.0:" + 
kafkaLocal.getKafkaPort());
         runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
 
-        final Map<String, String> attrs = new HashMap<>();
-        attrs.put("part", "3");
-        runner.enqueue("hello\nthere\nhow are you\ntoday".getBytes(), attrs);
-
-        runner.run(2);
-
-        runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
+        final String text = "Hello World\nGoodbye\n1\n2";
+        runner.enqueue(text.getBytes());
+        afterClass(); // kill Kafka right before send to ensure producer fails
+        runner.run(1, false);
 
-        final List<ProducerRecord<byte[], byte[]>> records = ((MockProducer) 
proc.getProducer()).getMessages();
-        for (int i = 0; i < 4; i++) {
-            assertEquals(3, records.get(i).partition().intValue());
+        runner.assertAllFlowFilesTransferred(PutKafka.REL_FAILURE, 1);
+        MockFlowFile ff = 
runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0);
+        String failedSegmentsStr = 
ff.getAttribute(PutKafka.ATTR_FAILED_SEGMENTS);
+        BitSet fs = BitSet.valueOf(failedSegmentsStr.getBytes());
+        assertTrue(fs.get(0));
+        assertTrue(fs.get(1));
+        assertTrue(fs.get(2));
+        assertTrue(fs.get(3));
+        String delimiter = ff.getAttribute(PutKafka.ATTR_DELIMITER);
+        assertEquals("\n", delimiter);
+        String key = ff.getAttribute(PutKafka.ATTR_KEY);
+        assertEquals("key1", key);
+        String topic = ff.getAttribute(PutKafka.ATTR_TOPIC);
+        assertEquals(topicName, topic);
+
+        bforeClass();
+        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + 
kafkaLocal.getKafkaPort());
+        Map<String, String> attr = new HashMap<>(ff.getAttributes());
+        /*
+         * So here we are emulating partial success. Basically even though all 
4
+         * messages failed to be sent by changing the ATTR_FAILED_SEGMENTS 
value
+         * we essentially saying that only two failed and need to be resent.
+         */
+        BitSet _fs = new BitSet();
+        _fs.set(1);
+        _fs.set(3);
+        attr.put(PutKafka.ATTR_FAILED_SEGMENTS, new String(_fs.toByteArray(), 
StandardCharsets.UTF_8));
+        ff.putAttributes(attr);
+        runner.enqueue(ff);
+        runner.run(1, false);
+        MockFlowFile sff = 
runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0);
+        assertNull(sff.getAttribute(PutKafka.ATTR_FAILED_SEGMENTS));
+        assertNull(sff.getAttribute(PutKafka.ATTR_TOPIC));
+        assertNull(sff.getAttribute(PutKafka.ATTR_KEY));
+        assertNull(sff.getAttribute(PutKafka.ATTR_DELIMITER));
+
+        ConsumerIterator<byte[], byte[]> consumer = 
this.buildConsumer(topicName);
+
+        assertEquals("Goodbye", new String(consumer.next().message()));
+        assertEquals("2", new String(consumer.next().message()));
+        try {
+            consumer.next();
+            fail();
+        } catch (Exception e) {
+            // ignore
         }
     }
 
-
-
     @Test
-    public void testUserDefinedPartitionWithInvalidValue() {
-        final TestableProcessor proc = new TestableProcessor();
-
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(PutKafka.TOPIC, "topic1");
+    public void testWithEmptyMessages() {
+        String topicName = "testWithEmptyMessages";
+        PutKafka putKafka = new PutKafka();
+        final TestRunner runner = TestRunners.newTestRunner(putKafka);
+        runner.setProperty(PutKafka.TOPIC, topicName);
         runner.setProperty(PutKafka.KEY, "key1");
-        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
-        runner.setProperty(PutKafka.PARTITION_STRATEGY, 
PutKafka.USER_DEFINED_PARTITIONING);
-        runner.setProperty(PutKafka.PARTITION, "${part}");
+        runner.setProperty(PutKafka.CLIENT_NAME, "foo");
+        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + 
kafkaLocal.getKafkaPort());
         runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
 
-        final Map<String, String> attrs = new HashMap<>();
-        attrs.put("part", "bogus");
-        runner.enqueue("hello\nthere\nhow are you\ntoday".getBytes(), attrs);
-
-        runner.run(2);
+        final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes();
+        runner.enqueue(bytes);
+        runner.run(1);
 
         runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
 
-        final List<ProducerRecord<byte[], byte[]>> records = ((MockProducer) 
proc.getProducer()).getMessages();
-        // should all be the same partition, regardless of what partition it 
is.
-        final int partition = records.get(0).partition().intValue();
-
-        for (int i = 0; i < 4; i++) {
-            assertEquals(partition, records.get(i).partition().intValue());
-        }
-    }
-
-
-    @Test
-    public void testFullBuffer() {
-        final TestableProcessor proc = new TestableProcessor();
-
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(PutKafka.TOPIC, "topic1");
-        runner.setProperty(PutKafka.KEY, "key1");
-        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
-        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
-        runner.setProperty(PutKafka.MAX_BUFFER_SIZE, "5 B");
-        proc.setMaxQueueSize(10L); // will take 4 bytes for key and 1 byte for 
value.
-
-        runner.enqueue("1\n2\n3\n4\n".getBytes());
-        runner.run(2);
-
-        runner.assertTransferCount(PutKafka.REL_SUCCESS, 1);
-        runner.assertTransferCount(PutKafka.REL_FAILURE, 1);
-
-        
runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0).assertContentEquals("1\n2\n");
-        
runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0).assertContentEquals("3\n4\n");
-    }
-
-
-    /**
-     * Used to override the {@link #getProducer()} method so that we can 
enforce that our MockProducer is used
-     */
-    private static class TestableProcessor extends PutKafka {
-        private final MockProducer producer;
-
-        public TestableProcessor() {
-            this(null);
-        }
-
-        public TestableProcessor(final Integer failAfter) {
-            this(failAfter, null);
-        }
-
-        public TestableProcessor(final Integer failAfter, final Integer 
stopFailingAfter) {
-            producer = new MockProducer();
-            producer.setFailAfter(failAfter);
-            producer.setStopFailingAfter(stopFailingAfter);
-        }
-
-        @Override
-        protected Producer<byte[], byte[]> getProducer() {
-            return producer;
-        }
-
-        public void setMaxQueueSize(final long bytes) {
-            producer.setMaxQueueSize(bytes);
+        ConsumerIterator<byte[], byte[]> consumer = 
this.buildConsumer(topicName);
+        assertNotNull(consumer.next());
+        assertNotNull(consumer.next());
+        assertNotNull(consumer.next());
+        assertNotNull(consumer.next());
+        try {
+            consumer.next();
+            fail();
+        } catch (Exception e) {
+            // ignore
         }
     }
 
-
-    /**
-     * We have our own Mock Producer, which is very similar to the 
Kafka-supplied one. However, with the Kafka-supplied
-     * Producer, we don't have the ability to tell it to fail after X number 
of messages; rather, we can only tell it
-     * to fail on the next message. Since we are sending multiple messages in 
a single onTrigger call for the Processor,
-     * this doesn't allow us to test failure conditions adequately.
-     */
-    private static class MockProducer implements Producer<byte[], byte[]> {
-
-        private int sendCount = 0;
-        private Integer failAfter;
-        private Integer stopFailingAfter;
-        private long queueSize = 0L;
-        private long maxQueueSize = Long.MAX_VALUE;
-
-        private final List<ProducerRecord<byte[], byte[]>> messages = new 
ArrayList<>();
-
-        public MockProducer() {
-        }
-
-        public void setMaxQueueSize(final long bytes) {
-            this.maxQueueSize = bytes;
-        }
-
-        public List<ProducerRecord<byte[], byte[]>> getMessages() {
-            return messages;
-        }
-
-        public void setFailAfter(final Integer successCount) {
-            failAfter = successCount;
-        }
-
-        public void setStopFailingAfter(final Integer stopFailingAfter) {
-            this.stopFailingAfter = stopFailingAfter;
-        }
-
-        @Override
-        public Future<RecordMetadata> send(final ProducerRecord<byte[], 
byte[]> record) {
-            return send(record, null);
-        }
-
-        @Override
-        public Future<RecordMetadata> send(final ProducerRecord<byte[], 
byte[]> record, final Callback callback) {
-            sendCount++;
-
-            final ByteArraySerializer serializer = new ByteArraySerializer();
-            final int keyBytes = serializer.serialize(record.topic(), 
record.key()).length;
-            final int valueBytes = serializer.serialize(record.topic(), 
record.value()).length;
-            if (maxQueueSize - queueSize < keyBytes + valueBytes) {
-                throw new BufferExhaustedException("Queue size is " + 
queueSize + " but serialized message is " + (keyBytes + valueBytes));
-            }
-
-            queueSize += keyBytes + valueBytes;
-
-            if (failAfter != null && sendCount > failAfter && 
((stopFailingAfter == null) || (sendCount < stopFailingAfter + 1))) {
-                final Exception e = new FailedToSendMessageException("Failed 
to send message", new RuntimeException("Unit test told to fail after " + 
failAfter + " successful messages"));
-                callback.onCompletion(null, e);
-            } else {
-                messages.add(record);
-                final RecordMetadata meta = new RecordMetadata(new 
TopicPartition(record.topic(), record.partition() == null ? 1 : 
record.partition()), 0L, 0L);
-                callback.onCompletion(meta, null);
-            }
-
-            // we don't actually look at the Future in the processor, so we 
can just return null
-            return null;
-        }
-
-        @Override
-        public List<PartitionInfo> partitionsFor(String topic) {
-            final Node leader = new Node(1, "localhost", 1111);
-            final Node node2 = new Node(2, "localhost-2", 2222);
-            final Node node3 = new Node(3, "localhost-3", 3333);
-
-            final PartitionInfo partInfo1 = new PartitionInfo(topic, 1, 
leader, new Node[] {node2, node3}, new Node[0]);
-            final PartitionInfo partInfo2 = new PartitionInfo(topic, 2, 
leader, new Node[] {node2, node3}, new Node[0]);
-            final PartitionInfo partInfo3 = new PartitionInfo(topic, 3, 
leader, new Node[] {node2, node3}, new Node[0]);
-
-            final List<PartitionInfo> infos = new ArrayList<>(3);
-            infos.add(partInfo1);
-            infos.add(partInfo2);
-            infos.add(partInfo3);
-            return infos;
-        }
-
-        @Override
-        public Map<MetricName, ? extends Metric> metrics() {
-            return Collections.emptyMap();
-        }
-
-        @Override
-        public void close() {
-        }
+    private ConsumerIterator<byte[], byte[]> buildConsumer(String topic) {
+        Properties props = new Properties();
+        props.put("zookeeper.connect", "0.0.0.0:" + 
kafkaLocal.getZookeeperPort());
+        props.put("group.id", "test");
+        props.put("consumer.timeout.ms", "5000");
+        props.put("auto.offset.reset", "smallest");
+        ConsumerConfig consumerConfig = new ConsumerConfig(props);
+        ConsumerConnector consumer = 
Consumer.createJavaConsumerConnector(consumerConfig);
+        Map<String, Integer> topicCountMap = new HashMap<>(1);
+        topicCountMap.put(topic, 1);
+        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
consumer.createMessageStreams(topicCountMap);
+        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
+        ConsumerIterator<byte[], byte[]> iter = streams.get(0).iterator();
+        return iter;
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e0e00ff2/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/resources/log4j.properties
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/resources/log4j.properties
index 35778d8..8e37bb9 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/resources/log4j.properties
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/resources/log4j.properties
@@ -12,12 +12,10 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-log4j.rootCategory=WARN, stdout
+log4j.rootCategory=INFO, stdout
 
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %t %c{2}:%L - 
%m%n
 
-log4j.category.org.apache.nifi.processors.kafka=INFO
-log4j.category.kafka=ERROR
-#log4j.category.org.apache.nifi.startup=INFO
+log4j.category.org.apache.nifi.processors.kafka=DEBUG

Reply via email to