[
https://issues.apache.org/jira/browse/NIFI-2865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15550068#comment-15550068
]
ASF GitHub Bot commented on NIFI-2865:
--------------------------------------
Github user bbende commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1097#discussion_r82078303
--- Diff:
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.stream.io.exception.TokenTooLargeException;
+import org.apache.nifi.stream.io.util.StreamDemarcator;
+
+public class PublisherLease implements Closeable {
+ private final ComponentLog logger;
+ private final Producer<byte[], byte[]> producer;
+ private final int maxMessageSize;
+ private final long maxAckWaitMillis;
+ private volatile boolean poisoned = false;
+
+ private InFlightMessageTracker tracker;
+
+ public PublisherLease(final Producer<byte[], byte[]> producer, final
int maxMessageSize, final long maxAckWaitMillis, final ComponentLog logger) {
+ this.producer = producer;
+ this.maxMessageSize = maxMessageSize;
+ this.logger = logger;
+ this.maxAckWaitMillis = maxAckWaitMillis;
+ }
+
+ protected void poison() {
+ this.poisoned = true;
+ }
+
+ public boolean isPoisoned() {
+ return poisoned;
+ }
+
+ void publish(final FlowFile flowFile, final InputStream
flowFileContent, final byte[] messageKey, final byte[] demarcatorBytes, final
String topic) throws IOException {
+ if (tracker == null) {
--- End diff --
Not sure if this is necessary, but would we want to check if poisoned here
and throw an exception if publish is being called after being poisoned?
> Address issues of PublishKafka blocking when having trouble communicating
> with Kafka broker and improve performance
> -------------------------------------------------------------------------------------------------------------------
>
> Key: NIFI-2865
> URL: https://issues.apache.org/jira/browse/NIFI-2865
> Project: Apache NiFi
> Issue Type: Bug
> Components: Extensions
> Reporter: Mark Payne
> Assignee: Mark Payne
> Fix For: 1.1.0
>
>
> When NiFi is unable to communicate properly with the Kafka broker, we see the
> NiFi threads occasionally block. This should be resolvable by calling the
> wakeup() method of the client. Additionally, if Kafka takes too long to
> respond, we should be able to route the FlowFile to failure and move on.
> PublishKafka has a nice feature that allows a demarcated stream to be sent as
> separate messages, so that a large number of messages can be sent as a single
> FlowFile. However, in the case of individual messages per FlowFile, the
> performance could be improved by batching together multiple FlowFiles per
> session
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)