Repository: nifi
Updated Branches:
  refs/heads/0.x 468b0fccc -> 7a9cbe13a


NIFI-2444 NIFI-2445 fixed PublishKafka
- fixed the logging issue NIFI-2444 by ensuring the ProcessLog is added to 
KafkaPublisher
- fixed KafkaPublisher's isAllAcked operation to ensure that it properly 
reports that the flow file has failed.
- added additional test


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7040b4b1
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7040b4b1
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7040b4b1

Branch: refs/heads/0.x
Commit: 7040b4b1d1c4f1b8e61cc5c8205e09876c0386c0
Parents: 468b0fc
Author: Oleg Zhurakousky <[email protected]>
Authored: Mon Aug 1 10:56:46 2016 -0400
Committer: Oleg Zhurakousky <[email protected]>
Committed: Mon Aug 1 13:24:16 2016 -0400

----------------------------------------------------------------------
 .../nifi/processors/kafka/KafkaPublisher.java   |  9 ++---
 .../processors/kafka/pubsub/KafkaPublisher.java | 12 ++-----
 .../processors/kafka/pubsub/PublishKafka.java   |  4 ++-
 .../kafka/pubsub/PublishKafkaTest.java          | 32 ++++++++++++++++-
 .../kafka/pubsub/StubPublishKafka.java          | 38 +++++++++++++++-----
 5 files changed, 68 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/7040b4b1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
index 6acdd62..2934799 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
@@ -33,10 +33,9 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.logging.ProcessorLog;
 import org.apache.nifi.stream.io.util.StreamDemarcator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import kafka.producer.Partitioner;
 
@@ -46,8 +45,6 @@ import kafka.producer.Partitioner;
  */
 class KafkaPublisher implements Closeable {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(KafkaPublisher.class);
-
     private final Producer<byte[], byte[]> kafkaProducer;
 
     private long ackWaitTime = 30000;
@@ -233,12 +230,10 @@ class KafkaPublisher implements Closeable {
      */
     private void warnOrError(String message, Exception e) {
         if (e == null) {
-            logger.warn(message);
             if (this.processLog != null) {
                 this.processLog.warn(message);
             }
         } else {
-            logger.error(message, e);
             if (this.processLog != null) {
                 this.processLog.error(message, e);
             }
@@ -262,7 +257,7 @@ class KafkaPublisher implements Closeable {
         }
 
         public boolean isAllAcked() {
-            return this.messagesSent - 1 == this.lastMessageAcked;
+            return this.lastMessageAcked > -1 && this.messagesSent - 1 == 
this.lastMessageAcked;
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/7040b4b1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
index ed7e21d..3c9d278 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
@@ -31,24 +31,20 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.logging.ProcessorLog;
 import org.apache.nifi.stream.io.util.StreamDemarcator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Wrapper over {@link KafkaProducer} to assist {@link PublishKafka} processor
  * with sending contents of the {@link FlowFile}s to Kafka.
  */
 class KafkaPublisher implements Closeable {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(KafkaPublisher.class);
-
     private final Producer<byte[], byte[]> kafkaProducer;
 
     private volatile long ackWaitTime = 30000;
 
-    private volatile ProcessorLog processLog;
+    private volatile ComponentLog processLog;
 
     private final int ackCheckSize;
 
@@ -212,12 +208,10 @@ class KafkaPublisher implements Closeable {
      */
     private void warnOrError(String message, Exception e) {
         if (e == null) {
-            logger.warn(message);
             if (this.processLog != null) {
                 this.processLog.warn(message);
             }
         } else {
-            logger.error(message, e);
             if (this.processLog != null) {
                 this.processLog.error(message, e);
             }
@@ -244,7 +238,7 @@ class KafkaPublisher implements Closeable {
         }
 
         public boolean isAllAcked() {
-            return this.messagesSent - 1 == this.lastMessageAcked;
+            return this.lastMessageAcked > -1 && this.messagesSent - 1 == 
this.lastMessageAcked;
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/7040b4b1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
index 6235f0b..ddd6d3f 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
@@ -228,7 +228,9 @@ public class PublishKafka extends 
AbstractKafkaProcessor<KafkaPublisher> {
         kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
         kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
         this.brokers = 
context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
-        return new KafkaPublisher(kafkaProperties);
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
+        publisher.setProcessLog(this.getLogger());
+        return publisher;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/nifi/blob/7040b4b1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
index af550b4..be97578 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
@@ -153,7 +153,7 @@ public class PublishKafkaTest {
         runner.setProperty(PublishKafka.KEY, "key1");
         runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234");
         runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n");
-        runner.setProperty(PublishKafka.META_WAIT_TIME, "500 millis");
+        runner.setProperty(PublishKafka.META_WAIT_TIME, "3000 millis");
 
         final String text = "Hello World\nGoodbye\nfail\n2";
         runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
@@ -164,6 +164,7 @@ public class PublishKafkaTest {
         Producer<byte[], byte[]> producer = putKafka.getProducer();
         verify(producer, times(4)).send(Mockito.any(ProducerRecord.class));
         runner.shutdown();
+        putKafka.destroy();
     }
 
     @SuppressWarnings("unchecked")
@@ -193,6 +194,35 @@ public class PublishKafkaTest {
 
     @SuppressWarnings("unchecked")
     @Test
+    public void 
validateOnFutureGetFailureAndThenResendSuccessFirstMessageFail() throws 
Exception {
+        String topicName = "validateSendFailureAndThenResendSuccess";
+        StubPublishKafka putKafka = new StubPublishKafka(100);
+
+        TestRunner runner = TestRunners.newTestRunner(putKafka);
+        runner.setProperty(PublishKafka.TOPIC, topicName);
+        runner.setProperty(PublishKafka.CLIENT_ID, "foo");
+        runner.setProperty(PublishKafka.KEY, "key1");
+        runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234");
+        runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n");
+        runner.setProperty(PublishKafka.META_WAIT_TIME, "500 millis");
+
+        final String text = "futurefail\nHello World\nGoodbye\n2";
+        runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
+        runner.run(1, false);
+        MockFlowFile ff = 
runner.getFlowFilesForRelationship(PublishKafka.REL_FAILURE).get(0);
+        assertNotNull(ff);
+        runner.enqueue(ff);
+
+        runner.run(1, false);
+        assertEquals(0, runner.getQueueSize().getObjectCount());
+        Producer<byte[], byte[]> producer = putKafka.getProducer();
+        // 6 sends due to duplication
+        verify(producer, times(5)).send(Mockito.any(ProducerRecord.class));
+        runner.shutdown();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
     public void validateOnFutureGetFailureAndThenResendSuccess() throws 
Exception {
         String topicName = "validateSendFailureAndThenResendSuccess";
         StubPublishKafka putKafka = new StubPublishKafka(100);

http://git-wip-us.apache.org/repos/asf/nifi/blob/7040b4b1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
index df38aee..0893614 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
@@ -22,14 +22,17 @@ import static org.mockito.Mockito.when;
 import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
 import java.util.Properties;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
@@ -46,6 +49,8 @@ public class StubPublishKafka extends PublishKafka {
 
     private final int ackCheckSize;
 
+    private final ExecutorService executor = Executors.newCachedThreadPool();
+
     StubPublishKafka(int ackCheckSize) {
         this.ackCheckSize = ackCheckSize;
     }
@@ -54,6 +59,10 @@ public class StubPublishKafka extends PublishKafka {
         return producer;
     }
 
+    public void destroy() {
+        this.executor.shutdownNow();
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     protected KafkaPublisher buildKafkaResource(ProcessContext context, 
ProcessSession session)
@@ -67,6 +76,7 @@ public class StubPublishKafka extends PublishKafka {
             f.setAccessible(true);
             f.set(this, 
context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue());
             publisher = (KafkaPublisher) 
TestUtils.getUnsafe().allocateInstance(KafkaPublisher.class);
+            publisher.setAckWaitTime(15000);
             producer = mock(Producer.class);
             this.instrumentProducer(producer, false);
             Field kf = KafkaPublisher.class.getDeclaredField("kafkaProducer");
@@ -85,21 +95,31 @@ public class StubPublishKafka extends PublishKafka {
 
     @SuppressWarnings("unchecked")
     private void instrumentProducer(Producer<byte[], byte[]> producer, boolean 
failRandomly) {
+
         when(producer.send(Mockito.any(ProducerRecord.class))).then(new 
Answer<Future<RecordMetadata>>() {
-            @SuppressWarnings("rawtypes")
             @Override
             public Future<RecordMetadata> answer(InvocationOnMock invocation) 
throws Throwable {
                 ProducerRecord<byte[], byte[]> record = 
(ProducerRecord<byte[], byte[]>) invocation.getArguments()[0];
-                String value = new String(record.value(), 
StandardCharsets.UTF_8);
+                final String value = new String(record.value(), 
StandardCharsets.UTF_8);
                 if ("fail".equals(value) && !StubPublishKafka.this.failed) {
                     StubPublishKafka.this.failed = true;
                     throw new RuntimeException("intentional");
                 }
-                Future future = mock(Future.class);
-                if ("futurefail".equals(value) && 
!StubPublishKafka.this.failed) {
-                    StubPublishKafka.this.failed = true;
-                    when(future.get(Mockito.anyLong(), 
Mockito.any(TimeUnit.class))).thenThrow(ExecutionException.class);
-                }
+
+                Future<RecordMetadata> future = executor.submit(new 
Callable<RecordMetadata>() {
+                    @Override
+                    public RecordMetadata call() throws Exception {
+                        if ("futurefail".equals(value) && 
!StubPublishKafka.this.failed) {
+                            // System.out.println("FAIL");
+                            StubPublishKafka.this.failed = true;
+                            throw new 
TopicAuthorizationException("Unauthorized");
+                        } else {
+                            TopicPartition partition = new 
TopicPartition("foo", 0);
+                            RecordMetadata meta = new 
RecordMetadata(partition, 0, 0);
+                            return meta;
+                        }
+                    }
+                });
                 return future;
             }
         });

Reply via email to