Repository: incubator-nifi
Updated Branches:
  refs/heads/develop dea9e2247 -> eb5ec703b


Fixes incorrect messages count in Provenance reporter. Adds Unit test to verify 
fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/eb5ec703
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/eb5ec703
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/eb5ec703

Branch: refs/heads/develop
Commit: eb5ec703ba0d5c188822a37f6d7eed14af56a594
Parents: dea9e22
Author: Oscar de la Pena <[email protected]>
Authored: Thu Mar 19 10:10:09 2015 +0800
Committer: Oscar de la Pena <[email protected]>
Committed: Thu Mar 19 10:10:27 2015 +0800

----------------------------------------------------------------------
 .../apache/nifi/processors/kafka/PutKafka.java  | 90 ++++++++++----------
 .../nifi/processors/kafka/TestPutKafka.java     | 76 +++++++++++++++--
 2 files changed, 114 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/eb5ec703/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
 
b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
index 4df4e08..e0b7588 100644
--- 
a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
+++ 
b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
@@ -309,64 +309,61 @@ public class PutKafka extends AbstractProcessor {
                                     data = 
Arrays.copyOfRange(baos.getUnderlyingBuffer(), 0, baos.size() - 
delimiterBytes.length);
                                 }
                                 
-                                createMessage: if ( data != null ) {
+                                if ( data != null ) {
                                     // If the message has no data, ignore it.
-                                    if ( data.length == 0 ) {
-                                        data = null;
-                                        baos.reset();
-                                        break createMessage;
-                                    }
-                                    
-                                    // either we ran out of data or we reached 
the end of the message. 
-                                    // Either way, create the message because 
it's ready to send.
-                                    final KeyedMessage<byte[], byte[]> message;
-                                    if ( key == null ) {
-                                        message = new KeyedMessage<>(topic, 
data);
-                                    } else {
-                                        message = new KeyedMessage<>(topic, 
keyBytes, data);
-                                    }
-                                    
-                                    // Add the message to the list of messages 
ready to send. If we've reached our
-                                    // threshold of how many we're willing to 
send (or if we're out of data), go ahead
-                                    // and send the whole List.
-                                    messages.add(message);
-                                    messageBytes += data.length;
-                                    if ( messageBytes >= maxBufferSize || 
streamFinished ) {
-                                        // send the messages, then reset our 
state.
-                                        try {
-                                            producer.send(messages);
-                                        } catch (final Exception e) {
-                                            // we wrap the general exception 
in ProcessException because we want to separate
-                                            // failures in sending messages 
from general Exceptions that would indicate bugs
-                                            // in the Processor. Failure to 
send a message should be handled appropriately, but
-                                            // we don't want to catch the 
general Exception or RuntimeException in order to catch
-                                            // failures from Kafka's Producer.
-                                            throw new ProcessException("Failed 
to send messages to Kafka", e);
+                                    if ( data.length != 0 ) {
+                                        // either we ran out of data or we 
reached the end of the message.
+                                        // Either way, create the message 
because it's ready to send.
+                                        final KeyedMessage<byte[], byte[]> 
message;
+                                        if (key == null) {
+                                            message = new 
KeyedMessage<>(topic, data);
+                                        } else {
+                                            message = new 
KeyedMessage<>(topic, keyBytes, data);
+                                        }
+
+                                        // Add the message to the list of 
messages ready to send. If we've reached our
+                                        // threshold of how many we're willing 
to send (or if we're out of data), go ahead
+                                        // and send the whole List.
+                                        messages.add(message);
+                                        messageBytes += data.length;
+                                        if (messageBytes >= maxBufferSize || 
streamFinished) {
+                                            // send the messages, then reset 
our state.
+                                            try {
+                                                producer.send(messages);
+                                            } catch (final Exception e) {
+                                                // we wrap the general 
exception in ProcessException because we want to separate
+                                                // failures in sending 
messages from general Exceptions that would indicate bugs
+                                                // in the Processor. Failure 
to send a message should be handled appropriately, but
+                                                // we don't want to catch the 
general Exception or RuntimeException in order to catch
+                                                // failures from Kafka's 
Producer.
+                                                throw new 
ProcessException("Failed to send messages to Kafka", e);
+                                            }
+
+                                            
messagesSent.addAndGet(messages.size());    // count number of messages sent
+
+                                            // reset state
+                                            messages.clear();
+                                            messageBytes = 0;
+
+                                            // We've successfully sent a batch 
of messages. Keep track of the byte offset in the
+                                            // FlowFile of the last 
successfully sent message. This way, if the messages cannot
+                                            // all be successfully sent, we 
know where to split off the data. This allows us to then
+                                            // split off the first X number of 
bytes and send to 'success' and then split off the rest
+                                            // and send them to 'failure'.
+                                            
lastMessageOffset.set(in.getBytesConsumed());
                                         }
-                                        
-                                        
messagesSent.addAndGet(messages.size());    // count number of messages sent
-                                        
-                                        // reset state
-                                        messages.clear();
-                                        messageBytes = 0;
-                                        
-                                        // We've successfully sent a batch of 
messages. Keep track of the byte offset in the
-                                        // FlowFile of the last successfully 
sent message. This way, if the messages cannot
-                                        // all be successfully sent, we know 
where to split off the data. This allows us to then
-                                        // split off the first X number of 
bytes and send to 'success' and then split off the rest
-                                        // and send them to 'failure'.
-                                        
lastMessageOffset.set(in.getBytesConsumed());
                                     }
-                                    
                                     // reset BAOS so that we can start a new 
message.
                                     baos.reset();
                                     data = null;
+
                                 }
                             }
 
                             // If there are messages left, send them
                             if ( !messages.isEmpty() ) {
                                 try {
+                                    messagesSent.addAndGet(messages.size());   
 // add count of messages
                                     producer.send(messages);
                                 } catch (final Exception e) {
                                     throw new ProcessException("Failed to send 
messages to Kafka", e);
@@ -377,7 +374,6 @@ public class PutKafka extends AbstractProcessor {
                 });
                 
                 final long nanos = System.nanoTime() - start;
-                
                 session.getProvenanceReporter().send(flowFile, "kafka://" + 
topic, "Sent " + messagesSent.get() + " messages");
                 session.transfer(flowFile, REL_SUCCESS);
                 getLogger().info("Successfully sent {} messages to Kafka for 
{} in {} millis", new Object[] {messagesSent.get(), flowFile, 
TimeUnit.NANOSECONDS.toMillis(nanos)});

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/eb5ec703/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
 
b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
index b0f2394..56a5c4b 100644
--- 
a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
+++ 
b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
@@ -25,6 +25,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
 import kafka.common.FailedToSendMessageException;
 import kafka.javaapi.producer.Producer;
@@ -32,12 +33,14 @@ import kafka.producer.KeyedMessage;
 import kafka.producer.ProducerConfig;
 
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
+
 import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.provenance.ProvenanceReporter;
+import org.apache.nifi.util.*;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
 
 
 public class TestPutKafka {
@@ -139,8 +142,70 @@ public class TestPutKafka {
         assertTrue(Arrays.equals("3".getBytes(), msgs.get(2)));
         assertTrue(Arrays.equals("4".getBytes(), msgs.get(3)));
     }
-    
-    
+
+    @Test
+    public void testProvenanceReporterMessagesCount(){
+        final TestableProcessor processor = new TestableProcessor();
+
+        ProvenanceReporter spyProvenanceReporter = Mockito.spy(new 
MockProvenanceReporter());
+
+        AtomicLong idGenerator = new AtomicLong(0L);
+        SharedSessionState sharedState = new SharedSessionState(processor, 
idGenerator);
+        Whitebox.setInternalState(sharedState, "provenanceReporter", 
spyProvenanceReporter);
+        MockFlowFileQueue flowFileQueue = sharedState.getFlowFileQueue();
+        MockSessionFactory sessionFactory = 
Mockito.mock(MockSessionFactory.class);
+        MockProcessSession mockProcessSession = new 
MockProcessSession(sharedState);
+        
Mockito.when(sessionFactory.createSession()).thenReturn(mockProcessSession);
+
+
+        final TestRunner runner = TestRunners.newTestRunner(processor);
+        Whitebox.setInternalState(runner, "flowFileQueue", flowFileQueue);
+        Whitebox.setInternalState(runner, "sessionFactory", sessionFactory);
+
+        runner.setProperty(PutKafka.TOPIC, "topic1");
+        runner.setProperty(PutKafka.KEY, "key1");
+        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
+        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
+
+        final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes();
+        runner.enqueue(bytes);
+        runner.run();
+
+        MockFlowFile mockFlowFile = 
mockProcessSession.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0);
+        Mockito.verify(spyProvenanceReporter, 
Mockito.atLeastOnce()).send(mockFlowFile, "kafka://topic1", "Sent 4 messages");
+    }
+
+    @Test
+    public void testProvenanceReporterWithoutDelimiterMessagesCount(){
+        final TestableProcessor processor = new TestableProcessor();
+
+        ProvenanceReporter spyProvenanceReporter = Mockito.spy(new 
MockProvenanceReporter());
+
+        AtomicLong idGenerator = new AtomicLong(0L);
+        SharedSessionState sharedState = new SharedSessionState(processor, 
idGenerator);
+        Whitebox.setInternalState(sharedState, "provenanceReporter", 
spyProvenanceReporter);
+        MockFlowFileQueue flowFileQueue = sharedState.getFlowFileQueue();
+        MockSessionFactory sessionFactory = 
Mockito.mock(MockSessionFactory.class);
+        MockProcessSession mockProcessSession = new 
MockProcessSession(sharedState);
+        
Mockito.when(sessionFactory.createSession()).thenReturn(mockProcessSession);
+
+
+        final TestRunner runner = TestRunners.newTestRunner(processor);
+        Whitebox.setInternalState(runner, "flowFileQueue", flowFileQueue);
+        Whitebox.setInternalState(runner, "sessionFactory", sessionFactory);
+
+        runner.setProperty(PutKafka.TOPIC, "topic1");
+        runner.setProperty(PutKafka.KEY, "key1");
+        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
+
+        final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes();
+        runner.enqueue(bytes);
+        runner.run();
+
+        MockFlowFile mockFlowFile = 
mockProcessSession.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0);
+        Mockito.verify(spyProvenanceReporter, 
Mockito.atLeastOnce()).send(mockFlowFile, "kafka://topic1");
+    }
+
        @Test
        @Ignore("Intended only for local testing; requires an actual running 
instance of Kafka & ZooKeeper...")
        public void testKeyValuePut() {
@@ -233,4 +298,5 @@ public class TestPutKafka {
             failAfter = successCount;
         }
        }
+
 }

Reply via email to