Repository: nifi Updated Branches: refs/heads/0.x 468b0fccc -> 7a9cbe13a
NIFI-2444 NIFI-2445 fixed PublishKafka - fixed the logging issue NIFI-2444 by ensuring the ProcessLog is added to KafkaPublisher - fixed KafkaPublisher's isAllAcked operation to ensure that it properly reports that the flow file has failed. - added additional test Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7040b4b1 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7040b4b1 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7040b4b1 Branch: refs/heads/0.x Commit: 7040b4b1d1c4f1b8e61cc5c8205e09876c0386c0 Parents: 468b0fc Author: Oleg Zhurakousky <[email protected]> Authored: Mon Aug 1 10:56:46 2016 -0400 Committer: Oleg Zhurakousky <[email protected]> Committed: Mon Aug 1 13:24:16 2016 -0400 ---------------------------------------------------------------------- .../nifi/processors/kafka/KafkaPublisher.java | 9 ++--- .../processors/kafka/pubsub/KafkaPublisher.java | 12 ++----- .../processors/kafka/pubsub/PublishKafka.java | 4 ++- .../kafka/pubsub/PublishKafkaTest.java | 32 ++++++++++++++++- .../kafka/pubsub/StubPublishKafka.java | 38 +++++++++++++++----- 5 files changed, 68 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/7040b4b1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java index 6acdd62..2934799 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java @@ -33,10 +33,9 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.stream.io.util.StreamDemarcator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import kafka.producer.Partitioner; @@ -46,8 +45,6 @@ import kafka.producer.Partitioner; */ class KafkaPublisher implements Closeable { - private static final Logger logger = LoggerFactory.getLogger(KafkaPublisher.class); - private final Producer<byte[], byte[]> kafkaProducer; private long ackWaitTime = 30000; @@ -233,12 +230,10 @@ class KafkaPublisher implements Closeable { */ private void warnOrError(String message, Exception e) { if (e == null) { - logger.warn(message); if (this.processLog != null) { this.processLog.warn(message); } } else { - logger.error(message, e); if (this.processLog != null) { this.processLog.error(message, e); } @@ -262,7 +257,7 @@ class KafkaPublisher implements Closeable { } public boolean isAllAcked() { - return this.messagesSent - 1 == this.lastMessageAcked; + return this.lastMessageAcked > -1 && this.messagesSent - 1 == this.lastMessageAcked; } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/7040b4b1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java index ed7e21d..3c9d278 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java @@ -31,24 +31,20 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.stream.io.util.StreamDemarcator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Wrapper over {@link KafkaProducer} to assist {@link PublishKafka} processor * with sending contents of the {@link FlowFile}s to Kafka. */ class KafkaPublisher implements Closeable { - - private static final Logger logger = LoggerFactory.getLogger(KafkaPublisher.class); - private final Producer<byte[], byte[]> kafkaProducer; private volatile long ackWaitTime = 30000; - private volatile ProcessorLog processLog; + private volatile ComponentLog processLog; private final int ackCheckSize; @@ -212,12 +208,10 @@ class KafkaPublisher implements Closeable { */ private void warnOrError(String message, Exception e) { if (e == null) { - logger.warn(message); if (this.processLog != null) { this.processLog.warn(message); } } else { - logger.error(message, e); if (this.processLog != null) { this.processLog.error(message, e); } @@ -244,7 +238,7 @@ class KafkaPublisher implements Closeable { } public boolean isAllAcked() { - return this.messagesSent - 1 == this.lastMessageAcked; + return this.lastMessageAcked > -1 && this.messagesSent - 1 == this.lastMessageAcked; } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/7040b4b1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java index 6235f0b..ddd6d3f 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java @@ -228,7 +228,9 @@ public class PublishKafka extends AbstractKafkaProcessor<KafkaPublisher> { kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); this.brokers = context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue(); - return new KafkaPublisher(kafkaProperties); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + publisher.setProcessLog(this.getLogger()); + return publisher; } /** http://git-wip-us.apache.org/repos/asf/nifi/blob/7040b4b1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java index af550b4..be97578 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java @@ -153,7 +153,7 @@ public class PublishKafkaTest { runner.setProperty(PublishKafka.KEY, "key1"); runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234"); runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n"); - runner.setProperty(PublishKafka.META_WAIT_TIME, "500 millis"); + runner.setProperty(PublishKafka.META_WAIT_TIME, "3000 millis"); final String text = "Hello World\nGoodbye\nfail\n2"; runner.enqueue(text.getBytes(StandardCharsets.UTF_8)); @@ -164,6 +164,7 @@ public class PublishKafkaTest { Producer<byte[], byte[]> producer = putKafka.getProducer(); verify(producer, times(4)).send(Mockito.any(ProducerRecord.class)); runner.shutdown(); + putKafka.destroy(); } @SuppressWarnings("unchecked") @@ -193,6 +194,35 @@ public class PublishKafkaTest { @SuppressWarnings("unchecked") @Test + public void validateOnFutureGetFailureAndThenResendSuccessFirstMessageFail() throws Exception { + String topicName = "validateSendFailureAndThenResendSuccess"; + StubPublishKafka putKafka = new StubPublishKafka(100); + + TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setProperty(PublishKafka.TOPIC, topicName); + runner.setProperty(PublishKafka.CLIENT_ID, "foo"); + runner.setProperty(PublishKafka.KEY, "key1"); + runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234"); + runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n"); + runner.setProperty(PublishKafka.META_WAIT_TIME, "500 millis"); + + final String text = "futurefail\nHello World\nGoodbye\n2"; + runner.enqueue(text.getBytes(StandardCharsets.UTF_8)); + runner.run(1, false); + MockFlowFile ff = runner.getFlowFilesForRelationship(PublishKafka.REL_FAILURE).get(0); + assertNotNull(ff); + runner.enqueue(ff); + + runner.run(1, false); + assertEquals(0, runner.getQueueSize().getObjectCount()); + Producer<byte[], byte[]> producer = putKafka.getProducer(); + // 6 sends due to duplication + verify(producer, times(5)).send(Mockito.any(ProducerRecord.class)); + runner.shutdown(); + } + + @SuppressWarnings("unchecked") + @Test public void validateOnFutureGetFailureAndThenResendSuccess() throws Exception { String topicName = "validateSendFailureAndThenResendSuccess"; StubPublishKafka putKafka = new StubPublishKafka(100); http://git-wip-us.apache.org/repos/asf/nifi/blob/7040b4b1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java index df38aee..0893614 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java @@ -22,14 +22,17 @@ import static org.mockito.Mockito.when; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; import java.util.Properties; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -46,6 +49,8 @@ public class StubPublishKafka extends PublishKafka { private final int ackCheckSize; + private final ExecutorService executor = Executors.newCachedThreadPool(); + StubPublishKafka(int ackCheckSize) { this.ackCheckSize = ackCheckSize; } @@ -54,6 +59,10 @@ public class StubPublishKafka extends PublishKafka { return producer; } + public void destroy() { + this.executor.shutdownNow(); + } + @SuppressWarnings("unchecked") @Override protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session) @@ -67,6 +76,7 @@ public class StubPublishKafka extends PublishKafka { f.setAccessible(true); f.set(this, context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue()); publisher = (KafkaPublisher) TestUtils.getUnsafe().allocateInstance(KafkaPublisher.class); + publisher.setAckWaitTime(15000); producer = mock(Producer.class); this.instrumentProducer(producer, false); Field kf = KafkaPublisher.class.getDeclaredField("kafkaProducer"); @@ -85,21 +95,31 @@ public class StubPublishKafka extends PublishKafka { @SuppressWarnings("unchecked") private void instrumentProducer(Producer<byte[], byte[]> producer, boolean failRandomly) { + when(producer.send(Mockito.any(ProducerRecord.class))).then(new Answer<Future<RecordMetadata>>() { - @SuppressWarnings("rawtypes") @Override public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable { ProducerRecord<byte[], byte[]> record = (ProducerRecord<byte[], byte[]>) invocation.getArguments()[0]; - String value = new String(record.value(), StandardCharsets.UTF_8); + final String value = new String(record.value(), StandardCharsets.UTF_8); if ("fail".equals(value) && !StubPublishKafka.this.failed) { StubPublishKafka.this.failed = true; throw new RuntimeException("intentional"); } - Future future = mock(Future.class); - if ("futurefail".equals(value) && !StubPublishKafka.this.failed) { - StubPublishKafka.this.failed = true; - when(future.get(Mockito.anyLong(), Mockito.any(TimeUnit.class))).thenThrow(ExecutionException.class); - } + + Future<RecordMetadata> future = executor.submit(new Callable<RecordMetadata>() { + @Override + public RecordMetadata call() throws Exception { + if ("futurefail".equals(value) && !StubPublishKafka.this.failed) { + // System.out.println("FAIL"); + StubPublishKafka.this.failed = true; + throw new TopicAuthorizationException("Unauthorized"); + } else { + TopicPartition partition = new TopicPartition("foo", 0); + RecordMetadata meta = new RecordMetadata(partition, 0, 0); + return meta; + } + } + }); return future; } });
