[ https://issues.apache.org/jira/browse/NIFI-2865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15550068#comment-15550068 ]
ASF GitHub Bot commented on NIFI-2865: -------------------------------------- Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/1097#discussion_r82078303 --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.kafka.pubsub; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.stream.io.exception.TokenTooLargeException; +import org.apache.nifi.stream.io.util.StreamDemarcator; + +public class PublisherLease implements Closeable { + private final ComponentLog logger; + private final Producer<byte[], byte[]> producer; + private final int maxMessageSize; + private final long maxAckWaitMillis; + private volatile boolean poisoned = false; + + private InFlightMessageTracker tracker; + + public PublisherLease(final Producer<byte[], byte[]> producer, final int maxMessageSize, final long maxAckWaitMillis, final ComponentLog logger) { + this.producer = producer; + this.maxMessageSize = maxMessageSize; + this.logger = logger; + this.maxAckWaitMillis = maxAckWaitMillis; + } + + protected void poison() { + this.poisoned = true; + } + + public boolean isPoisoned() { + return poisoned; + } + + void publish(final FlowFile flowFile, final InputStream flowFileContent, final byte[] messageKey, final byte[] demarcatorBytes, final String topic) throws IOException { + if (tracker == null) { --- End diff -- Not sure if this is necessary, but would we want to check if poisoned here and throw an exception if publish is being called after being poisoned? > Address issues of PublishKafka blocking when having trouble communicating > with Kafka broker and improve performance > ------------------------------------------------------------------------------------------------------------------- > > Key: NIFI-2865 > URL: https://issues.apache.org/jira/browse/NIFI-2865 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions > Reporter: Mark Payne > Assignee: Mark Payne > Fix For: 1.1.0 > > > When NiFi is unable to communicate properly with the Kafka broker, we see the > NiFi threads occasionally block. This should be resolvable by calling the > wakeup() method of the client. Additionally, if Kafka takes too long to > respond, we should be able to route the FlowFile to failure and move on. > PublishKafka has a nice feature that allows a demarcated stream to be sent as > separate messages, so that a large number of messages can be sent as a single > FlowFile. However, in the case of individual messages per FlowFile, the > performance could be improved by batching together multiple FlowFiles per > session -- This message was sent by Atlassian JIRA (v6.3.4#6332)