http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java new file mode 100644 index 0000000..e7d5cb7 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.kafka.pubsub; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.nifi.flowfile.FlowFile; + +public class InFlightMessageTracker { + private final ConcurrentMap<FlowFile, Counts> messageCountsByFlowFile = new ConcurrentHashMap<>(); + private final ConcurrentMap<FlowFile, Exception> failures = new ConcurrentHashMap<>(); + private final Object progressMutex = new Object(); + + public void incrementAcknowledgedCount(final FlowFile flowFile) { + final Counts counter = messageCountsByFlowFile.computeIfAbsent(flowFile, ff -> new Counts()); + counter.incrementAcknowledgedCount(); + + synchronized (progressMutex) { + progressMutex.notify(); + } + } + + public int getAcknowledgedCount(final FlowFile flowFile) { + final Counts counter = messageCountsByFlowFile.get(flowFile); + return (counter == null) ? 0 : counter.getAcknowledgedCount(); + } + + public void incrementSentCount(final FlowFile flowFile) { + final Counts counter = messageCountsByFlowFile.computeIfAbsent(flowFile, ff -> new Counts()); + counter.incrementSentCount(); + } + + public int getSentCount(final FlowFile flowFile) { + final Counts counter = messageCountsByFlowFile.get(flowFile); + return (counter == null) ? 0 : counter.getSentCount(); + } + + public void fail(final FlowFile flowFile, final Exception exception) { + failures.putIfAbsent(flowFile, exception); + + synchronized (progressMutex) { + progressMutex.notify(); + } + } + + public Exception getFailure(final FlowFile flowFile) { + return failures.get(flowFile); + } + + public boolean isFailed(final FlowFile flowFile) { + return getFailure(flowFile) != null; + } + + public void reset() { + messageCountsByFlowFile.clear(); + failures.clear(); + } + + public PublishResult failOutstanding(final Exception exception) { + messageCountsByFlowFile.keySet().stream() + .filter(ff -> !isComplete(ff)) + .filter(ff -> !failures.containsKey(ff)) + .forEach(ff -> failures.put(ff, exception)); + + return createPublishResult(); + } + + private boolean isComplete(final FlowFile flowFile) { + final Counts counts = messageCountsByFlowFile.get(flowFile); + if (counts.getAcknowledgedCount() == counts.getSentCount()) { + // all messages received successfully. + return true; + } + + if (failures.containsKey(flowFile)) { + // FlowFile failed so is complete + return true; + } + + return false; + } + + private boolean isComplete() { + return messageCountsByFlowFile.keySet().stream() + .allMatch(flowFile -> isComplete(flowFile)); + } + + void awaitCompletion(final long millis) throws InterruptedException, TimeoutException { + final long startTime = System.nanoTime(); + final long maxTime = startTime + TimeUnit.MILLISECONDS.toNanos(millis); + + while (System.nanoTime() < maxTime) { + synchronized (progressMutex) { + if (isComplete()) { + return; + } + + progressMutex.wait(millis); + } + } + + throw new TimeoutException(); + } + + + PublishResult createPublishResult() { + return new PublishResult() { + @Override + public Collection<FlowFile> getSuccessfulFlowFiles() { + if (failures.isEmpty()) { + return messageCountsByFlowFile.keySet(); + } + + final Set<FlowFile> flowFiles = new HashSet<>(messageCountsByFlowFile.keySet()); + flowFiles.removeAll(failures.keySet()); + return flowFiles; + } + + @Override + public Collection<FlowFile> getFailedFlowFiles() { + return failures.keySet(); + } + + @Override + public int getSuccessfulMessageCount(final FlowFile flowFile) { + return getAcknowledgedCount(flowFile); + } + + @Override + public Exception getReasonForFailure(final FlowFile flowFile) { + return getFailure(flowFile); + } + }; + } + + public static class Counts { + private final AtomicInteger sentCount = new AtomicInteger(0); + private final AtomicInteger acknowledgedCount = new AtomicInteger(0); + + public void incrementSentCount() { + sentCount.incrementAndGet(); + } + + public void incrementAcknowledgedCount() { + acknowledgedCount.incrementAndGet(); + } + + public int getAcknowledgedCount() { + return acknowledgedCount.get(); + } + + public int getSentCount() { + return sentCount.get(); + } + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java index 3ae7495..3d09f2d 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java @@ -27,8 +27,9 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; -import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SaslConfigs; @@ -55,6 +56,10 @@ final class KafkaProcessorUtils { private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*"; + static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string."); + static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded", + "The key is interpreted as arbitrary binary data and is encoded using hexadecimal characters with uppercase letters"); + static final Pattern HEX_KEY_PATTERN = Pattern.compile("(?:[0123456789abcdefABCDEF]{2})+"); static final String KAFKA_KEY = "kafka.key"; @@ -182,7 +187,7 @@ final class KafkaProcessorUtils { final Class<?> classType; - public KafkaConfigValidator(final Class classType) { + public KafkaConfigValidator(final Class<?> classType) { this.classType = classType; } @@ -207,7 +212,8 @@ final class KafkaProcessorUtils { return builder.toString(); } - static void buildCommonKafkaProperties(final ProcessContext context, final Class kafkaConfigClass, final Map<String, String> mapToPopulate) { + + static void buildCommonKafkaProperties(final ProcessContext context, final Class<?> kafkaConfigClass, final Map<String, Object> mapToPopulate) { for (PropertyDescriptor propertyDescriptor : context.getProperties().keySet()) { if (propertyDescriptor.equals(SSL_CONTEXT_SERVICE)) { // Translate SSLContext Service configuration into Kafka properties @@ -226,28 +232,33 @@ final class KafkaProcessorUtils { mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, sslContextService.getTrustStoreType()); } } - String pName = propertyDescriptor.getName(); - String pValue = propertyDescriptor.isExpressionLanguageSupported() + + String propertyName = propertyDescriptor.getName(); + String propertyValue = propertyDescriptor.isExpressionLanguageSupported() ? context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue() : context.getProperty(propertyDescriptor).getValue(); - if (pValue != null) { - if (pName.endsWith(".ms")) { // kafka standard time notation - pValue = String.valueOf(FormatUtils.getTimeDuration(pValue.trim(), TimeUnit.MILLISECONDS)); + + if (propertyValue != null) { + // If the property name ends in ".ms" then it is a time period. We want to accept either an integer as number of milliseconds + // or the standard NiFi time period such as "5 secs" + if (propertyName.endsWith(".ms") && !StringUtils.isNumeric(propertyValue.trim())) { // kafka standard time notation + propertyValue = String.valueOf(FormatUtils.getTimeDuration(propertyValue.trim(), TimeUnit.MILLISECONDS)); } - if (isStaticStringFieldNamePresent(pName, kafkaConfigClass, CommonClientConfigs.class, SslConfigs.class, SaslConfigs.class)) { - mapToPopulate.put(pName, pValue); + + if (isStaticStringFieldNamePresent(propertyName, kafkaConfigClass, CommonClientConfigs.class, SslConfigs.class, SaslConfigs.class)) { + mapToPopulate.put(propertyName, propertyValue); } } } } - private static boolean isStaticStringFieldNamePresent(final String name, final Class... classes) { + private static boolean isStaticStringFieldNamePresent(final String name, final Class<?>... classes) { return KafkaProcessorUtils.getPublicStaticStringFieldValues(classes).contains(name); } - private static Set<String> getPublicStaticStringFieldValues(final Class... classes) { + private static Set<String> getPublicStaticStringFieldValues(final Class<?>... classes) { final Set<String> strings = new HashSet<>(); - for (final Class classType : classes) { + for (final Class<?> classType : classes) { for (final Field field : classType.getDeclaredFields()) { if (Modifier.isPublic(field.getModifiers()) && Modifier.isStatic(field.getModifiers()) && field.getType().equals(String.class)) { try { http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java deleted file mode 100644 index 31a084f..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java +++ /dev/null @@ -1,236 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.kafka.pubsub; - -import java.io.Closeable; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.stream.io.util.StreamDemarcator; - -/** - * Wrapper over {@link KafkaProducer} to assist {@link PublishKafka} processor - * with sending contents of the {@link FlowFile}s to Kafka. - */ -class KafkaPublisher implements Closeable { - - private final Producer<byte[], byte[]> kafkaProducer; - - private volatile long ackWaitTime = 30000; - - private final ComponentLog componentLog; - - private final int ackCheckSize; - - KafkaPublisher(Properties kafkaProperties, ComponentLog componentLog) { - this(kafkaProperties, 100, componentLog); - } - - /** - * Creates an instance of this class as well as the instance of the - * corresponding Kafka {@link KafkaProducer} using provided Kafka - * configuration properties. - * - * @param kafkaProperties instance of {@link Properties} used to bootstrap - * {@link KafkaProducer} - */ - KafkaPublisher(Properties kafkaProperties, int ackCheckSize, ComponentLog componentLog) { - this.kafkaProducer = new KafkaProducer<>(kafkaProperties); - this.ackCheckSize = ackCheckSize; - this.componentLog = componentLog; - } - - /** - * Publishes messages to Kafka topic. It uses {@link StreamDemarcator} to - * determine how many messages to Kafka will be sent from a provided - * {@link InputStream} (see {@link PublishingContext#getContentStream()}). - * It supports two publishing modes: - * <ul> - * <li>Sending all messages constructed from - * {@link StreamDemarcator#nextToken()} operation.</li> - * <li>Sending only unacknowledged messages constructed from - * {@link StreamDemarcator#nextToken()} operation.</li> - * </ul> - * The unacknowledged messages are determined from the value of - * {@link PublishingContext#getLastAckedMessageIndex()}. - * <br> - * This method assumes content stream affinity where it is expected that the - * content stream that represents the same Kafka message(s) will remain the - * same across possible retries. This is required specifically for cases - * where delimiter is used and a single content stream may represent - * multiple Kafka messages. The - * {@link PublishingContext#getLastAckedMessageIndex()} will provide the - * index of the last ACKed message, so upon retry only messages with the - * higher index are sent. - * - * @param publishingContext instance of {@link PublishingContext} which hold - * context information about the message(s) to be sent. - * @return The index of the last successful offset. - */ - KafkaPublisherResult publish(PublishingContext publishingContext) { - StreamDemarcator streamTokenizer = new StreamDemarcator(publishingContext.getContentStream(), - publishingContext.getDelimiterBytes(), publishingContext.getMaxRequestSize()); - - int prevLastAckedMessageIndex = publishingContext.getLastAckedMessageIndex(); - List<Future<RecordMetadata>> resultFutures = new ArrayList<>(); - - byte[] messageBytes; - int tokenCounter = 0; - boolean continueSending = true; - KafkaPublisherResult result = null; - for (; continueSending && (messageBytes = streamTokenizer.nextToken()) != null; tokenCounter++) { - if (prevLastAckedMessageIndex < tokenCounter) { - ProducerRecord<byte[], byte[]> message = new ProducerRecord<>(publishingContext.getTopic(), publishingContext.getKeyBytes(), messageBytes); - resultFutures.add(this.kafkaProducer.send(message)); - - if (tokenCounter % this.ackCheckSize == 0) { - int lastAckedMessageIndex = this.processAcks(resultFutures, prevLastAckedMessageIndex); - resultFutures.clear(); - if (lastAckedMessageIndex % this.ackCheckSize != 0) { - continueSending = false; - result = new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex); - } - prevLastAckedMessageIndex = lastAckedMessageIndex; - } - } - } - - if (result == null) { - int lastAckedMessageIndex = this.processAcks(resultFutures, prevLastAckedMessageIndex); - resultFutures.clear(); - result = new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex); - } - return result; - } - - /** - * Sets the time this publisher will wait for the {@link Future#get()} - * operation (the Future returned by - * {@link KafkaProducer#send(ProducerRecord)}) to complete before timing - * out. - * - * This value will also be used as a timeout when closing the underlying - * {@link KafkaProducer}. See {@link #close()}. - */ - void setAckWaitTime(long ackWaitTime) { - this.ackWaitTime = ackWaitTime; - } - - /** - * This operation will process ACKs from Kafka in the order in which - * {@link KafkaProducer#send(ProducerRecord)} invocation were made returning - * the index of the last ACKed message. Within this operation processing ACK - * simply means successful invocation of 'get()' operation on the - * {@link Future} returned by {@link KafkaProducer#send(ProducerRecord)} - * operation. Upon encountering any type of error while interrogating such - * {@link Future} the ACK loop will end. Messages that were not ACKed would - * be considered non-delivered and therefore could be resent at the later - * time. - * - * @param sendFutures list of {@link Future}s representing results of - * publishing to Kafka - * - * @param lastAckMessageIndex the index of the last ACKed message. It is - * important to provide the last ACKed message especially while re-trying so - * the proper index is maintained. - */ - private int processAcks(List<Future<RecordMetadata>> sendFutures, int lastAckMessageIndex) { - boolean exceptionThrown = false; - for (int segmentCounter = 0; segmentCounter < sendFutures.size() && !exceptionThrown; segmentCounter++) { - Future<RecordMetadata> future = sendFutures.get(segmentCounter); - try { - future.get(this.ackWaitTime, TimeUnit.MILLISECONDS); - lastAckMessageIndex++; - } catch (InterruptedException e) { - exceptionThrown = true; - Thread.currentThread().interrupt(); - this.warnOrError("Interrupted while waiting for acks from Kafka", null); - } catch (ExecutionException e) { - exceptionThrown = true; - this.warnOrError("Failed while waiting for acks from Kafka", e); - } catch (TimeoutException e) { - exceptionThrown = true; - this.warnOrError("Timed out while waiting for acks from Kafka", null); - } - } - - return lastAckMessageIndex; - } - - /** - * Will close the underlying {@link KafkaProducer} waiting if necessary for - * the same duration as supplied {@link #setAckWaitTime(long)} - */ - @Override - public void close() { - this.kafkaProducer.close(this.ackWaitTime, TimeUnit.MILLISECONDS); - } - - /** - * - */ - private void warnOrError(String message, Exception e) { - if (e == null) { - this.componentLog.warn(message); - } else { - this.componentLog.error(message, e); - } - } - - /** - * Encapsulates the result received from publishing messages to Kafka - */ - static class KafkaPublisherResult { - - private final int messagesSent; - private final int lastMessageAcked; - - KafkaPublisherResult(int messagesSent, int lastMessageAcked) { - this.messagesSent = messagesSent; - this.lastMessageAcked = lastMessageAcked; - } - - public int getMessagesSent() { - return this.messagesSent; - } - - public int getLastMessageAcked() { - return this.lastMessageAcked; - } - - public boolean isAllAcked() { - return this.lastMessageAcked > -1 && this.messagesSent - 1 == this.lastMessageAcked; - } - - @Override - public String toString() { - return "Sent:" + this.messagesSent + "; Last ACK:" + this.lastMessageAcked; - } - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java index 18f3018..bb0bed2 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java @@ -14,9 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.nifi.processors.kafka.pubsub; -import java.io.Closeable; +import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; @@ -27,17 +28,16 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; + import javax.xml.bind.DatatypeConverter; -import org.apache.kafka.clients.consumer.KafkaConsumer; + import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnStopped; @@ -46,202 +46,192 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.FlowFileFilters; import org.apache.nifi.processor.util.StandardValidators; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub", "0.10"}) -@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka using the Kafka 0.10 producer. " - + "The messages to send may be individual FlowFiles or may be delimited, using a " - + "user-specified delimiter, such as a new-line. " - + " Please note there are cases where the publisher can get into an indefinite stuck state. We are closely monitoring" - + " how this evolves in the Kafka community and will take advantage of those fixes as soon as we can. In the mean time" - + " it is possible to enter states where the only resolution will be to restart the JVM NiFi runs on.") + +@Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub", "0.10.x"}) +@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka using the Kafka 0.10.x Producer API." + + "The messages to send may be individual FlowFiles or may be delimited, using a " + + "user-specified delimiter, such as a new-line. " + + " Please note there are cases where the publisher can get into an indefinite stuck state. We are closely monitoring" + + " how this evolves in the Kafka community and will take advantage of those fixes as soon as we can. In the meantime" + + " it is possible to enter states where the only resolution will be to restart the JVM NiFi runs on. The complementary NiFi processor for fetching messages is ConsumeKafka_0_10.") @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.", - description = "These properties will be added on the Kafka configuration after loading any provided configuration properties." + description = "These properties will be added on the Kafka configuration after loading any provided configuration properties." + " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged." + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ") -public class PublishKafka_0_10 extends AbstractSessionFactoryProcessor { - - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - - protected static final String FAILED_PROC_ID_ATTR = "failed.proc.id"; - - protected static final String FAILED_LAST_ACK_IDX = "failed.last.idx"; - - protected static final String FAILED_TOPIC_ATTR = "failed.topic"; - - protected static final String FAILED_KEY_ATTR = "failed.key"; - - protected static final String FAILED_DELIMITER_ATTR = "failed.delimiter"; - +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to " + + "FlowFiles that are routed to success. If the <Message Demarcator> Property is not set, this will always be 1, but if the Property is set, it may " + + "be greater than 1.") +public class PublishKafka_0_10 extends AbstractProcessor { protected static final String MSG_COUNT = "msg.count"; static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery", - "FlowFile will be routed to failure unless the message is replicated to the appropriate " + "FlowFile will be routed to failure unless the message is replicated to the appropriate " + "number of Kafka Nodes according to the Topic configuration"); static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery", - "FlowFile will be routed to success if the message is received by a single Kafka node, " + "FlowFile will be routed to success if the message is received by a single Kafka node, " + "whether or not it is replicated. This is faster than <Guarantee Replicated Delivery> " + "but can result in data loss if a Kafka node crashes"); static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort", - "FlowFile will be routed to success after successfully writing the content to a Kafka node, " + "FlowFile will be routed to success after successfully writing the content to a Kafka node, " + "without waiting for a response. This provides the best performance but may result in data loss."); static final AllowableValue ROUND_ROBIN_PARTITIONING = new AllowableValue(Partitioners.RoundRobinPartitioner.class.getName(), - Partitioners.RoundRobinPartitioner.class.getSimpleName(), - "Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, " + Partitioners.RoundRobinPartitioner.class.getSimpleName(), + "Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, " + "the next Partition to Partition 2, and so on, wrapping as necessary."); static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner", - "DefaultPartitioner", "Messages will be assigned to random partitions."); + "DefaultPartitioner", "Messages will be assigned to random partitions."); static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string."); static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded", "The key is interpreted as arbitrary binary data that is encoded using hexadecimal characters with uppercase letters."); static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() - .name("topic") - .displayName("Topic Name") - .description("The name of the Kafka Topic to publish to.") - .required(true) - .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(true) - .build(); + .name("topic") + .displayName("Topic Name") + .description("The name of the Kafka Topic to publish to.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(true) + .build(); static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder() - .name(ProducerConfig.ACKS_CONFIG) - .displayName("Delivery Guarantee") - .description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.") - .required(true) - .expressionLanguageSupported(false) - .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED) - .defaultValue(DELIVERY_BEST_EFFORT.getValue()) - .build(); - - static final PropertyDescriptor META_WAIT_TIME = new PropertyDescriptor.Builder() - .name(ProducerConfig.MAX_BLOCK_MS_CONFIG) - .displayName("Meta Data Wait Time") - .description("The amount of time KafkaConsumer will wait to obtain metadata during the 'send' call before failing the " - + "entire 'send' call. Corresponds to Kafka's 'max.block.ms' property") - .required(true) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .expressionLanguageSupported(true) - .defaultValue("30 sec") - .build(); + .name(ProducerConfig.ACKS_CONFIG) + .displayName("Delivery Guarantee") + .description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.") + .required(true) + .expressionLanguageSupported(false) + .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED) + .defaultValue(DELIVERY_BEST_EFFORT.getValue()) + .build(); + + static final PropertyDescriptor METADATA_WAIT_TIME = new PropertyDescriptor.Builder() + .name(ProducerConfig.MAX_BLOCK_MS_CONFIG) + .displayName("Max Metadata Wait Time") + .description("The amount of time publisher will wait to obtain metadata or wait for the buffer to flush during the 'send' call before failing the " + + "entire 'send' call. Corresponds to Kafka's 'max.block.ms' property") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(true) + .defaultValue("5 sec") + .build(); + + static final PropertyDescriptor ACK_WAIT_TIME = new PropertyDescriptor.Builder() + .name("ack.wait.time") + .displayName("Acknowledgment Wait Time") + .description("After sending a message to Kafka, this indicates the amount of time that we are willing to wait for a response from Kafka. " + + "If Kafka does not acknowledge the message within this time period, the FlowFile will be routed to 'failure'.") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(false) + .required(true) + .defaultValue("5 secs") + .build(); static final PropertyDescriptor MAX_REQUEST_SIZE = new PropertyDescriptor.Builder() - .name("max.request.size") - .displayName("Max Request Size") - .description("The maximum size of a request in bytes. Corresponds to Kafka's 'max.request.size' property and defaults to 1 MB (1048576).") - .required(true) - .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) - .defaultValue("1 MB") - .build(); + .name("max.request.size") + .displayName("Max Request Size") + .description("The maximum size of a request in bytes. Corresponds to Kafka's 'max.request.size' property and defaults to 1 MB (1048576).") + .required(true) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .defaultValue("1 MB") + .build(); static final PropertyDescriptor KEY = new PropertyDescriptor.Builder() - .name("kafka-key") - .displayName("Kafka Key") - .description("The Key to use for the Message. It will be serialized as UTF-8 bytes. " - + "If not specified then the flow file attribute kafka.key is used if present " - + "and we're not demarcating. In that case the hex string is coverted to its byte" - + "form and written as a byte[] key.") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .build(); + .name("kafka-key") + .displayName("Kafka Key") + .description("The Key to use for the Message. " + + "If not specified, the flow file attribute 'kafka.key' is used as the message key, if it is present " + + "and we're not demarcating.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder() - .name("key-attribute-encoding") - .displayName("Key Attribute Encoding") - .description("FlowFiles that are emitted have an attribute named '" + KafkaProcessorUtils.KAFKA_KEY + "'. This property dictates how the value of the attribute should be encoded.") - .required(true) - .defaultValue(UTF8_ENCODING.getValue()) - .allowableValues(UTF8_ENCODING, HEX_ENCODING) - .build(); + .name("key-attribute-encoding") + .displayName("Key Attribute Encoding") + .description("FlowFiles that are emitted have an attribute named '" + KafkaProcessorUtils.KAFKA_KEY + "'. This property dictates how the value of the attribute should be encoded.") + .required(true) + .defaultValue(UTF8_ENCODING.getValue()) + .allowableValues(UTF8_ENCODING, HEX_ENCODING) + .build(); static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder() - .name("message-demarcator") - .displayName("Message Demarcator") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .description("Specifies the string (interpreted as UTF-8) to use for demarcating multiple messages within " - + "a single FlowFile. If not specified, the entire content of the FlowFile will be used as a single message. If specified, the " - + "contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka message. " - + "To enter special character such as 'new line' use CTRL+Enter or Shift+Enter depending on your OS.") - .build(); + .name("message-demarcator") + .displayName("Message Demarcator") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .description("Specifies the string (interpreted as UTF-8) to use for demarcating multiple messages within " + + "a single FlowFile. If not specified, the entire content of the FlowFile will be used as a single message. If specified, the " + + "contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka message. " + + "To enter special character such as 'new line' use CTRL+Enter or Shift+Enter, depending on your OS.") + .build(); static final PropertyDescriptor PARTITION_CLASS = new PropertyDescriptor.Builder() - .name(ProducerConfig.PARTITIONER_CLASS_CONFIG) - .displayName("Partitioner class") - .description("Specifies which class to use to compute a partition id for a message. Corresponds to Kafka's 'partitioner.class' property.") - .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING) - .defaultValue(RANDOM_PARTITIONING.getValue()) - .required(false) - .build(); + .name(ProducerConfig.PARTITIONER_CLASS_CONFIG) + .displayName("Partitioner class") + .description("Specifies which class to use to compute a partition id for a message. Corresponds to Kafka's 'partitioner.class' property.") + .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING) + .defaultValue(RANDOM_PARTITIONING.getValue()) + .required(false) + .build(); static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder() - .name(ProducerConfig.COMPRESSION_TYPE_CONFIG) - .displayName("Compression Type") - .description("This parameter allows you to specify the compression codec for all data generated by this producer.") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .allowableValues("none", "gzip", "snappy", "lz4") - .defaultValue("none") - .build(); + .name(ProducerConfig.COMPRESSION_TYPE_CONFIG) + .displayName("Compression Type") + .description("This parameter allows you to specify the compression codec for all data generated by this producer.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .allowableValues("none", "gzip", "snappy", "lz4") + .defaultValue("none") + .build(); static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("FlowFiles for which all content was sent to Kafka.") - .build(); + .name("success") + .description("FlowFiles for which all content was sent to Kafka.") + .build(); static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship") - .build(); - - static final List<PropertyDescriptor> DESCRIPTORS; - - static final Set<Relationship> RELATIONSHIPS; + .name("failure") + .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship") + .build(); - private volatile String brokers; + private static final List<PropertyDescriptor> PROPERTIES; + private static final Set<Relationship> RELATIONSHIPS; - private final AtomicInteger taskCounter = new AtomicInteger(); + private volatile PublisherPool publisherPool = null; - private volatile boolean acceptTask = true; - - /* - * Will ensure that list of PropertyDescriptors is build only once, since - * all other lifecycle methods are invoked multiple times. - */ static { - final List<PropertyDescriptor> _descriptors = new ArrayList<>(); - _descriptors.addAll(KafkaProcessorUtils.getCommonPropertyDescriptors()); - _descriptors.add(TOPIC); - _descriptors.add(DELIVERY_GUARANTEE); - _descriptors.add(KEY); - _descriptors.add(KEY_ATTRIBUTE_ENCODING); - _descriptors.add(MESSAGE_DEMARCATOR); - _descriptors.add(MAX_REQUEST_SIZE); - _descriptors.add(META_WAIT_TIME); - _descriptors.add(PARTITION_CLASS); - _descriptors.add(COMPRESSION_CODEC); - - DESCRIPTORS = Collections.unmodifiableList(_descriptors); - - final Set<Relationship> _relationships = new HashSet<>(); - _relationships.add(REL_SUCCESS); - _relationships.add(REL_FAILURE); - RELATIONSHIPS = Collections.unmodifiableSet(_relationships); + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.addAll(KafkaProcessorUtils.getCommonPropertyDescriptors()); + properties.add(TOPIC); + properties.add(DELIVERY_GUARANTEE); + properties.add(KEY); + properties.add(KEY_ATTRIBUTE_ENCODING); + properties.add(MESSAGE_DEMARCATOR); + properties.add(MAX_REQUEST_SIZE); + properties.add(ACK_WAIT_TIME); + properties.add(METADATA_WAIT_TIME); + properties.add(PARTITION_CLASS); + properties.add(COMPRESSION_CODEC); + + PROPERTIES = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + RELATIONSHIPS = Collections.unmodifiableSet(relationships); } @Override @@ -251,15 +241,17 @@ public class PublishKafka_0_10 extends AbstractSessionFactoryProcessor { @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return DESCRIPTORS; + return PROPERTIES; } @Override protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { return new PropertyDescriptor.Builder() - .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.") - .name(propertyDescriptorName).addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class)).dynamic(true) - .build(); + .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.") + .name(propertyDescriptorName) + .addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class)) + .dynamic(true) + .build(); } @Override @@ -267,226 +259,123 @@ public class PublishKafka_0_10 extends AbstractSessionFactoryProcessor { return KafkaProcessorUtils.validateCommonProperties(validationContext); } - volatile KafkaPublisher kafkaPublisher; - - /** - * This thread-safe operation will delegate to - * {@link #rendezvousWithKafka(ProcessContext, ProcessSession)} after first - * checking and creating (if necessary) Kafka resource which could be either - * {@link KafkaPublisher} or {@link KafkaConsumer}. It will also close and - * destroy the underlying Kafka resource upon catching an {@link Exception} - * raised by {@link #rendezvousWithKafka(ProcessContext, ProcessSession)}. - * After Kafka resource is destroyed it will be re-created upon the next - * invocation of this operation essentially providing a self healing - * mechanism to deal with potentially corrupted resource. - * <p> - * Keep in mind that upon catching an exception the state of this processor - * will be set to no longer accept any more tasks, until Kafka resource is - * reset. This means that in a multi-threaded situation currently executing - * tasks will be given a chance to complete while no new tasks will be - * accepted. - * - * @param context context - * @param sessionFactory factory - */ - @Override - public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { - if (this.acceptTask) { // acts as a circuit breaker to allow existing tasks to wind down so 'kafkaPublisher' can be reset before new tasks are accepted. - this.taskCounter.incrementAndGet(); - final ProcessSession session = sessionFactory.createSession(); - try { - /* - * We can't be doing double null check here since as a pattern - * it only works for lazy init but not reset, which is what we - * are doing here. In fact the first null check is dangerous - * since 'kafkaPublisher' can become null right after its null - * check passed causing subsequent NPE. - */ - synchronized (this) { - if (this.kafkaPublisher == null) { - this.kafkaPublisher = this.buildKafkaResource(context, session); - } - } - - /* - * The 'processed' boolean flag does not imply any failure or success. It simply states that: - * - ConsumeKafka - some messages were received form Kafka and 1_ FlowFile were generated - * - PublishKafka0_10 - some messages were sent to Kafka based on existence of the input FlowFile - */ - boolean processed = this.rendezvousWithKafka(context, session); - session.commit(); - if (!processed) { - context.yield(); - } - } catch (Throwable e) { - this.acceptTask = false; - session.rollback(true); - this.getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, e}); - } finally { - synchronized (this) { - if (this.taskCounter.decrementAndGet() == 0 && !this.acceptTask) { - this.close(); - this.acceptTask = true; - } - } - } - } else { - this.logger.debug("Task was not accepted due to the processor being in 'reset' state. It will be re-submitted upon completion of the reset."); - this.getLogger().debug("Task was not accepted due to the processor being in 'reset' state. It will be re-submitted upon completion of the reset."); - context.yield(); + private synchronized PublisherPool getPublisherPool(final ProcessContext context) { + PublisherPool pool = publisherPool; + if (pool != null) { + return pool; } + + return publisherPool = createPublisherPool(context); + } + + protected PublisherPool createPublisherPool(final ProcessContext context) { + final int maxMessageSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue(); + final long maxAckWaitMillis = context.getProperty(ACK_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS).longValue(); + + final Map<String, Object> kafkaProperties = new HashMap<>(); + KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProperties); + kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + kafkaProperties.put("max.request.size", String.valueOf(maxMessageSize)); + + return new PublisherPool(kafkaProperties, getLogger(), maxMessageSize, maxAckWaitMillis); } - /** - * Will call {@link Closeable#close()} on the target resource after which - * the target resource will be set to null. Should only be called when there - * are no more threads being executed on this processor or when it has been - * verified that only a single thread remains. - * - * @see KafkaPublisher - * @see KafkaConsumer - */ @OnStopped - public void close() { - try { - if (this.kafkaPublisher != null) { - try { - this.kafkaPublisher.close(); - } catch (Exception e) { - this.getLogger().warn("Failed while closing " + this.kafkaPublisher, e); - } - } - } finally { - this.kafkaPublisher = null; + public void closePool() { + if (publisherPool != null) { + publisherPool.close(); } + + publisherPool = null; } - /** - * Will rendezvous with Kafka if {@link ProcessSession} contains - * {@link FlowFile} producing a result {@link FlowFile}. - * <br> - * The result {@link FlowFile} that is successful is then transfered to - * {@link #REL_SUCCESS} - * <br> - * The result {@link FlowFile} that is failed is then transfered to - * {@link #REL_FAILURE} - * - */ - protected boolean rendezvousWithKafka(ProcessContext context, ProcessSession session) { - FlowFile flowFile = session.get(); - if (flowFile != null) { - long start = System.nanoTime(); - flowFile = this.doRendezvousWithKafka(flowFile, context, session); - Relationship relationship = REL_SUCCESS; - if (!this.isFailedFlowFile(flowFile)) { - String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); - long executionDuration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - String transitUri = KafkaProcessorUtils.buildTransitURI(context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue(), this.brokers, topic); - session.getProvenanceReporter().send(flowFile, transitUri, "Sent " + flowFile.getAttribute(MSG_COUNT) + " Kafka messages", executionDuration); - this.getLogger().debug("Successfully sent {} to Kafka as {} message(s) in {} millis", - new Object[]{flowFile, flowFile.getAttribute(MSG_COUNT), executionDuration}); - } else { - relationship = REL_FAILURE; - flowFile = session.penalize(flowFile); - } - session.transfer(flowFile, relationship); + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final boolean useDemarcator = context.getProperty(MESSAGE_DEMARCATOR).isSet(); + + final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 500)); + if (flowFiles.isEmpty()) { + return; } - return flowFile != null; - } - /** - * Builds and instance of {@link KafkaPublisher}. - */ - protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session) { - final Map<String, String> kafkaProps = new HashMap<>(); - KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProps); - kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); - kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); - kafkaProps.put("max.request.size", String.valueOf(context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue())); - this.brokers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue(); - final Properties props = new Properties(); - props.putAll(kafkaProps); - KafkaPublisher publisher = new KafkaPublisher(props, this.getLogger()); - return publisher; - } + final PublisherPool pool = getPublisherPool(context); + if (pool == null) { + context.yield(); + return; + } + + final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue(); + final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue(); + + final long startTime = System.nanoTime(); + try (final PublisherLease lease = pool.obtainPublisher()) { + // Send each FlowFile to Kafka asynchronously. + for (final FlowFile flowFile : flowFiles) { + if (!isScheduled()) { + // If stopped, re-queue FlowFile instead of sending it + session.transfer(flowFile); + continue; + } - /** - * Will rendezvous with {@link KafkaPublisher} after building - * {@link PublishingContext} and will produce the resulting - * {@link FlowFile}. The resulting FlowFile contains all required - * information to determine if message publishing originated from the - * provided FlowFile has actually succeeded fully, partially or failed - * completely (see {@link #isFailedFlowFile(FlowFile)}. - */ - private FlowFile doRendezvousWithKafka(final FlowFile flowFile, final ProcessContext context, final ProcessSession session) { - final AtomicReference<KafkaPublisher.KafkaPublisherResult> publishResultRef = new AtomicReference<>(); - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(InputStream contentStream) throws IOException { - PublishingContext publishingContext = PublishKafka_0_10.this.buildPublishingContext(flowFile, context, contentStream); - KafkaPublisher.KafkaPublisherResult result = PublishKafka_0_10.this.kafkaPublisher.publish(publishingContext); - publishResultRef.set(result); + final byte[] messageKey = getMessageKey(flowFile, context); + final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); + final byte[] demarcatorBytes; + if (useDemarcator) { + demarcatorBytes = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8); + } else { + demarcatorBytes = null; + } + + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream rawIn) throws IOException { + try (final InputStream in = new BufferedInputStream(rawIn)) { + lease.publish(flowFile, in, messageKey, demarcatorBytes, topic); + } + } + }); } - }); - FlowFile resultFile = publishResultRef.get().isAllAcked() - ? this.cleanUpFlowFileIfNecessary(flowFile, session) - : session.putAllAttributes(flowFile, this.buildFailedFlowFileAttributes(publishResultRef.get().getLastMessageAcked(), flowFile, context)); + // Complete the send + final PublishResult publishResult = lease.complete(); - if (!this.isFailedFlowFile(resultFile)) { - resultFile = session.putAttribute(resultFile, MSG_COUNT, String.valueOf(publishResultRef.get().getMessagesSent())); - } - return resultFile; - } + // Transfer any successful FlowFiles. + final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime); + for (FlowFile success : publishResult.getSuccessfulFlowFiles()) { + final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(success).getValue(); - /** - * Builds {@link PublishingContext} for message(s) to be sent to Kafka. - * {@link PublishingContext} contains all contextual information required by - * {@link KafkaPublisher} to publish to Kafka. Such information contains - * things like topic name, content stream, delimiter, key and last ACKed - * message for cases where provided FlowFile is being retried (failed in the - * past). - * <br> - * For the clean FlowFile (file that has been sent for the first time), - * PublishingContext will be built form {@link ProcessContext} associated - * with this invocation. - * <br> - * For the failed FlowFile, {@link PublishingContext} will be built from - * attributes of that FlowFile which by then will already contain required - * information (e.g., topic, key, delimiter etc.). This is required to - * ensure the affinity of the retry in the even where processor - * configuration has changed. However keep in mind that failed FlowFile is - * only considered a failed FlowFile if it is being re-processed by the same - * processor (determined via {@link #FAILED_PROC_ID_ATTR}, see - * {@link #isFailedFlowFile(FlowFile)}). If failed FlowFile is being sent to - * another PublishKafka0_10 processor it is treated as a fresh FlowFile - * regardless if it has #FAILED* attributes set. - */ - private PublishingContext buildPublishingContext(FlowFile flowFile, ProcessContext context, InputStream contentStream) { - final byte[] keyBytes = getMessageKey(flowFile, context); - - final String topicName; - final byte[] delimiterBytes; - int lastAckedMessageIndex = -1; - if (this.isFailedFlowFile(flowFile)) { - lastAckedMessageIndex = Integer.valueOf(flowFile.getAttribute(FAILED_LAST_ACK_IDX)); - topicName = flowFile.getAttribute(FAILED_TOPIC_ATTR); - delimiterBytes = flowFile.getAttribute(FAILED_DELIMITER_ATTR) != null - ? flowFile.getAttribute(FAILED_DELIMITER_ATTR).getBytes(StandardCharsets.UTF_8) : null; - } else { - topicName = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); - delimiterBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet() ? context.getProperty(MESSAGE_DEMARCATOR) - .evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8) : null; - } + final int msgCount = publishResult.getSuccessfulMessageCount(success); + success = session.putAttribute(success, MSG_COUNT, String.valueOf(msgCount)); + session.adjustCounter("Messages Sent", msgCount, true); + + final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topic); + session.getProvenanceReporter().send(success, transitUri, "Sent " + msgCount + " messages", transmissionMillis); + session.transfer(success, REL_SUCCESS); + } + + // Transfer any failures. + for (final FlowFile failure : publishResult.getFailedFlowFiles()) { + final int successCount = publishResult.getSuccessfulMessageCount(failure); + if (successCount > 0) { + getLogger().error("Failed to send some messages for {} to Kafka, but {} messages were acknowledged by Kafka. Routing to failure due to {}", + new Object[] {failure, successCount, publishResult.getReasonForFailure(failure)}); + } else { + getLogger().error("Failed to send all message for {} to Kafka; routing to failure due to {}", + new Object[] {failure, publishResult.getReasonForFailure(failure)}); + } - PublishingContext publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex, - context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue()); - publishingContext.setKeyBytes(keyBytes); - publishingContext.setDelimiterBytes(delimiterBytes); - return publishingContext; + session.transfer(failure, REL_FAILURE); + } + } } + private byte[] getMessageKey(final FlowFile flowFile, final ProcessContext context) { + if (context.getProperty(MESSAGE_DEMARCATOR).isSet()) { + return null; + } + final String uninterpretedKey; if (context.getProperty(KEY).isSet()) { uninterpretedKey = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); @@ -505,51 +394,4 @@ public class PublishKafka_0_10 extends AbstractSessionFactoryProcessor { return DatatypeConverter.parseHexBinary(uninterpretedKey); } - - /** - * Will remove FAILED_* attributes if FlowFile is no longer considered a - * failed FlowFile - * - * @see #isFailedFlowFile(FlowFile) - */ - private FlowFile cleanUpFlowFileIfNecessary(FlowFile flowFile, ProcessSession session) { - if (this.isFailedFlowFile(flowFile)) { - Set<String> keysToRemove = new HashSet<>(); - keysToRemove.add(FAILED_DELIMITER_ATTR); - keysToRemove.add(FAILED_KEY_ATTR); - keysToRemove.add(FAILED_TOPIC_ATTR); - keysToRemove.add(FAILED_PROC_ID_ATTR); - keysToRemove.add(FAILED_LAST_ACK_IDX); - flowFile = session.removeAllAttributes(flowFile, keysToRemove); - } - return flowFile; - } - - /** - * Builds a {@link Map} of FAILED_* attributes - * - * @see #FAILED_PROC_ID_ATTR - * @see #FAILED_LAST_ACK_IDX - * @see #FAILED_TOPIC_ATTR - * @see #FAILED_KEY_ATTR - * @see #FAILED_DELIMITER_ATTR - */ - private Map<String, String> buildFailedFlowFileAttributes(int lastAckedMessageIndex, FlowFile sourceFlowFile, ProcessContext context) { - Map<String, String> attributes = new HashMap<>(); - attributes.put(FAILED_PROC_ID_ATTR, this.getIdentifier()); - attributes.put(FAILED_LAST_ACK_IDX, String.valueOf(lastAckedMessageIndex)); - attributes.put(FAILED_TOPIC_ATTR, context.getProperty(TOPIC).evaluateAttributeExpressions(sourceFlowFile).getValue()); - attributes.put(FAILED_KEY_ATTR, context.getProperty(KEY).evaluateAttributeExpressions(sourceFlowFile).getValue()); - attributes.put(FAILED_DELIMITER_ATTR, context.getProperty(MESSAGE_DEMARCATOR).isSet() - ? context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions(sourceFlowFile).getValue() : null); - return attributes; - } - - /** - * Returns 'true' if provided FlowFile is a failed FlowFile. A failed - * FlowFile contains {@link #FAILED_PROC_ID_ATTR}. - */ - private boolean isFailedFlowFile(FlowFile flowFile) { - return this.getIdentifier().equals(flowFile.getAttribute(FAILED_PROC_ID_ATTR)); - } } http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishResult.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishResult.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishResult.java new file mode 100644 index 0000000..b685265 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishResult.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.kafka.pubsub; + +import java.util.Collection; +import java.util.Collections; + +import org.apache.nifi.flowfile.FlowFile; + +public interface PublishResult { + Collection<FlowFile> getSuccessfulFlowFiles(); + + Collection<FlowFile> getFailedFlowFiles(); + + int getSuccessfulMessageCount(FlowFile flowFile); + + Exception getReasonForFailure(FlowFile flowFile); + + + public static final PublishResult EMPTY = new PublishResult() { + @Override + public Collection<FlowFile> getSuccessfulFlowFiles() { + return Collections.emptyList(); + } + + @Override + public Collection<FlowFile> getFailedFlowFiles() { + return Collections.emptyList(); + } + + @Override + public int getSuccessfulMessageCount(FlowFile flowFile) { + return 0; + } + + @Override + public Exception getReasonForFailure(FlowFile flowFile) { + return null; + } + }; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java new file mode 100644 index 0000000..b67e8a8 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.kafka.pubsub; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.stream.io.exception.TokenTooLargeException; +import org.apache.nifi.stream.io.util.StreamDemarcator; + +public class PublisherLease implements Closeable { + private final ComponentLog logger; + private final Producer<byte[], byte[]> producer; + private final int maxMessageSize; + private final long maxAckWaitMillis; + private volatile boolean poisoned = false; + + private InFlightMessageTracker tracker; + + public PublisherLease(final Producer<byte[], byte[]> producer, final int maxMessageSize, final long maxAckWaitMillis, final ComponentLog logger) { + this.producer = producer; + this.maxMessageSize = maxMessageSize; + this.logger = logger; + this.maxAckWaitMillis = maxAckWaitMillis; + } + + protected void poison() { + this.poisoned = true; + } + + public boolean isPoisoned() { + return poisoned; + } + + void publish(final FlowFile flowFile, final InputStream flowFileContent, final byte[] messageKey, final byte[] demarcatorBytes, final String topic) throws IOException { + if (tracker == null) { + tracker = new InFlightMessageTracker(); + } + + try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) { + byte[] messageContent; + try { + while ((messageContent = demarcator.nextToken()) != null) { + // We do not want to use any key if we have a demarcator because that would result in + // the key being the same for multiple messages + final byte[] keyToUse = demarcatorBytes == null ? messageKey : null; + publish(flowFile, keyToUse, messageContent, topic, tracker); + + if (tracker.isFailed(flowFile)) { + // If we have a failure, don't try to send anything else. + return; + } + } + } catch (final TokenTooLargeException ttle) { + tracker.fail(flowFile, ttle); + } + } catch (final Exception e) { + tracker.fail(flowFile, e); + poison(); + throw e; + } + } + + private void publish(final FlowFile flowFile, final byte[] messageKey, final byte[] messageContent, final String topic, final InFlightMessageTracker tracker) { + final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, null, messageKey, messageContent); + producer.send(record, new Callback() { + @Override + public void onCompletion(final RecordMetadata metadata, final Exception exception) { + if (exception == null) { + tracker.incrementAcknowledgedCount(flowFile); + } else { + tracker.fail(flowFile, exception); + poison(); + } + } + }); + + tracker.incrementSentCount(flowFile); + } + + public PublishResult complete() { + if (tracker == null) { + throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed"); + } + + producer.flush(); + + try { + tracker.awaitCompletion(maxAckWaitMillis); + return tracker.createPublishResult(); + } catch (final InterruptedException e) { + logger.warn("Interrupted while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka"); + Thread.currentThread().interrupt(); + return tracker.failOutstanding(e); + } catch (final TimeoutException e) { + logger.warn("Timed out while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka"); + return tracker.failOutstanding(e); + } finally { + tracker = null; + } + } + + @Override + public void close() { + producer.close(maxAckWaitMillis, TimeUnit.MILLISECONDS); + tracker = null; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java new file mode 100644 index 0000000..5902b03 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.kafka.pubsub; + +import java.io.Closeable; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.nifi.logging.ComponentLog; + +public class PublisherPool implements Closeable { + private final ComponentLog logger; + private final BlockingQueue<PublisherLease> publisherQueue; + private final Map<String, Object> kafkaProperties; + private final int maxMessageSize; + private final long maxAckWaitMillis; + + private volatile boolean closed = false; + + PublisherPool(final Map<String, Object> kafkaProperties, final ComponentLog logger, final int maxMessageSize, final long maxAckWaitMillis) { + this.logger = logger; + this.publisherQueue = new LinkedBlockingQueue<>(); + this.kafkaProperties = kafkaProperties; + this.maxMessageSize = maxMessageSize; + this.maxAckWaitMillis = maxAckWaitMillis; + } + + public PublisherLease obtainPublisher() { + if (isClosed()) { + throw new IllegalStateException("Connection Pool is closed"); + } + + PublisherLease lease = publisherQueue.poll(); + if (lease != null) { + return lease; + } + + lease = createLease(); + return lease; + } + + private PublisherLease createLease() { + final Producer<byte[], byte[]> producer = new KafkaProducer<>(kafkaProperties); + final PublisherLease lease = new PublisherLease(producer, maxMessageSize, maxAckWaitMillis, logger) { + @Override + public void close() { + if (isPoisoned() || isClosed()) { + super.close(); + } else { + publisherQueue.offer(this); + } + } + }; + + return lease; + } + + public synchronized boolean isClosed() { + return closed; + } + + @Override + public synchronized void close() { + closed = true; + + PublisherLease lease; + while ((lease = publisherQueue.poll()) != null) { + lease.close(); + } + } + + /** + * Returns the number of leases that are currently available + * + * @return the number of leases currently available + */ + protected int available() { + return publisherQueue.size(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java deleted file mode 100644 index 1513481..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.kafka.pubsub; - -import java.io.InputStream; -import java.nio.charset.StandardCharsets; - -/** - * Holder of context information used by {@link KafkaPublisher} required to - * publish messages to Kafka. - */ -class PublishingContext { - - private final InputStream contentStream; - - private final String topic; - - private final int lastAckedMessageIndex; - - private final int maxRequestSize; - - private byte[] keyBytes; - - private byte[] delimiterBytes; - - PublishingContext(InputStream contentStream, String topic) { - this(contentStream, topic, -1); - } - - PublishingContext(InputStream contentStream, String topic, int lastAckedMessageIndex) { - this(contentStream, topic, lastAckedMessageIndex, 1048576); - } - - PublishingContext(InputStream contentStream, String topic, int lastAckedMessageIndex, int maxRequestSize) { - this.validateInput(contentStream, topic, lastAckedMessageIndex); - this.contentStream = contentStream; - this.topic = topic; - this.lastAckedMessageIndex = lastAckedMessageIndex; - this.maxRequestSize = maxRequestSize; - } - - @Override - public String toString() { - return "topic: '" + this.topic + "'; delimiter: '" + new String(this.delimiterBytes, StandardCharsets.UTF_8) + "'"; - } - - int getLastAckedMessageIndex() { - return this.lastAckedMessageIndex; - } - - int getMaxRequestSize() { - return this.maxRequestSize; - } - - byte[] getKeyBytes() { - return this.keyBytes; - } - - byte[] getDelimiterBytes() { - return this.delimiterBytes; - } - - InputStream getContentStream() { - return this.contentStream; - } - - String getTopic() { - return this.topic; - } - - void setKeyBytes(byte[] keyBytes) { - if (this.keyBytes == null) { - if (keyBytes != null) { - this.assertBytesValid(keyBytes); - this.keyBytes = keyBytes; - } - } else { - throw new IllegalArgumentException("'keyBytes' can only be set once per instance"); - } - } - - void setDelimiterBytes(byte[] delimiterBytes) { - if (this.delimiterBytes == null) { - if (delimiterBytes != null) { - this.assertBytesValid(delimiterBytes); - this.delimiterBytes = delimiterBytes; - } - } else { - throw new IllegalArgumentException("'delimiterBytes' can only be set once per instance"); - } - } - - private void assertBytesValid(byte[] bytes) { - if (bytes != null) { - if (bytes.length == 0) { - throw new IllegalArgumentException("'bytes' must not be empty"); - } - } - } - - private void validateInput(InputStream contentStream, String topic, int lastAckedMessageIndex) { - if (contentStream == null) { - throw new IllegalArgumentException("'contentStream' must not be null"); - } else if (topic == null || topic.trim().length() == 0) { - throw new IllegalArgumentException("'topic' must not be null or empty"); - } else if (lastAckedMessageIndex < -1) { - throw new IllegalArgumentException("'lastAckedMessageIndex' must be >= -1"); - } - } -}